Merge pull request 'Добавил key' (#1) from add-key into master

Reviewed-on: #1
master
L_DelOff 2024-06-02 22:01:24 +03:00
commit a5d6289f87
2 changed files with 12 additions and 16 deletions

View File

@ -1,26 +1,17 @@
package ru.ldeloff.demokafkastream; package ru.ldeloff.demokafkastream;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.*;
import org.apache.kafka.common.network.Mode;
import org.apache.logging.log4j.CloseableThreadContext;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.MessageSource;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message; import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.DataInput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -48,22 +39,25 @@ public class DemoKafkaStreamApplication {
// Продюсер (сам периодически отправляет сообщения) // Продюсер (сам периодически отправляет сообщения)
@Bean @Bean
public Supplier<Model> producerAuto1() { public Supplier<Message<Model>> producerAuto1() {
return () -> { return () -> {
Model message = new Model(++i); Model message = new Model(++i);
// System.out.println("Produce auto 1:" + message); // System.out.println("Produce auto 1:" + message);
return message; return MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.KEY, String.valueOf(message.getId()))
.build();
}; };
} }
// Вручную отправим сообщение // Вручную отправим сообщение
@GetMapping(value = "/test1") @GetMapping(value = "/test1")
public String test1() { public String test1() {
Model message = new Model(++i); Model message = new Model(++i);
System.out.println("Produce manual (kafka 1): " + message); System.out.println("Produce manual (kafka 1): " + message);
streamBridge.send("producerManual1-out-0", message); streamBridge.send("producerManual1-out-0",
MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.KEY, String.valueOf(message.getId()))
.build());
return "test OK"; return "test OK";
} }

View File

@ -44,6 +44,8 @@ spring:
stream: stream:
kafka: kafka:
binder: binder:
producer-properties:
key.serializer: "org.apache.kafka.common.serialization.StringSerializer"
brokers: brokers:
- 192.168.10.101:29092 - 192.168.10.101:29092