From 6360df290e39759b5f7c6d07c3e9d6b6624a67ff Mon Sep 17 00:00:00 2001 From: L_DelOff Date: Sun, 2 Jun 2024 21:59:02 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB=20ke?= =?UTF-8?q?y?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DemoKafkaStreamApplication.java | 26 +++++++------------ src/main/resources/application.yml | 2 ++ 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java b/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java index 93e9522..c8d5102 100644 --- a/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java +++ b/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java @@ -1,26 +1,17 @@ package ru.ldeloff.demokafkastream; -import com.fasterxml.jackson.core.JsonParser; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.*; -import org.apache.kafka.common.network.Mode; -import org.apache.logging.log4j.CloseableThreadContext; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.function.StreamBridge; -import org.springframework.context.MessageSource; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; -import org.apache.kafka.common.serialization.StringSerializer; -import java.io.DataInput; -import java.io.IOException; -import java.io.Serializable; -import java.util.Objects; import java.util.function.Consumer; import java.util.function.Supplier; @@ -48,22 +39,25 @@ public class DemoKafkaStreamApplication { // Продюсер (сам периодически отправляет сообщения) @Bean - public Supplier producerAuto1() { + public Supplier> producerAuto1() { return () -> { Model message = new Model(++i); // System.out.println("Produce auto 1:" + message); - return message; + return MessageBuilder.withPayload(message) + .setHeader(KafkaHeaders.KEY, String.valueOf(message.getId())) + .build(); }; } - - // Вручную отправим сообщение @GetMapping(value = "/test1") public String test1() { Model message = new Model(++i); System.out.println("Produce manual (kafka 1): " + message); - streamBridge.send("producerManual1-out-0", message); + streamBridge.send("producerManual1-out-0", + MessageBuilder.withPayload(message) + .setHeader(KafkaHeaders.KEY, String.valueOf(message.getId())) + .build()); return "test OK"; } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 678a786..c849bf9 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -44,6 +44,8 @@ spring: stream: kafka: binder: + producer-properties: + key.serializer: "org.apache.kafka.common.serialization.StringSerializer" brokers: - 192.168.10.101:29092 -- 2.40.1