diff --git a/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java b/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java index 93e9522..c8d5102 100644 --- a/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java +++ b/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java @@ -1,26 +1,17 @@ package ru.ldeloff.demokafkastream; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.*; -import org.apache.kafka.common.network.Mode; -import org.apache.logging.log4j.CloseableThreadContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.function.StreamBridge; -import org.springframework.context.MessageSource; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; -import org.apache.kafka.common.serialization.StringSerializer; -import java.io.DataInput; -import java.io.IOException; -import java.io.Serializable; -import java.util.Objects; import java.util.function.Consumer; import java.util.function.Supplier; @@ -48,22 +39,25 @@ public class DemoKafkaStreamApplication { // Продюсер (сам периодически отправляет сообщения) @Bean - public Supplier producerAuto1() { + public Supplier> producerAuto1() { return () -> { Model message = new Model(++i); // System.out.println("Produce auto 1:" + message); - return message; + return MessageBuilder.withPayload(message) + .setHeader(KafkaHeaders.KEY, String.valueOf(message.getId())) + .build(); }; } - - // Вручную отправим сообщение @GetMapping(value = "/test1") public String test1() { Model message = new Model(++i); System.out.println("Produce manual (kafka 1): " + message); - streamBridge.send("producerManual1-out-0", message); + streamBridge.send("producerManual1-out-0", + MessageBuilder.withPayload(message) + .setHeader(KafkaHeaders.KEY, String.valueOf(message.getId())) + .build()); return "test OK"; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 678a786..c849bf9 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -44,6 +44,8 @@ spring: stream: kafka: binder: + producer-properties: + key.serializer: "org.apache.kafka.common.serialization.StringSerializer" brokers: - 192.168.10.101:29092