Добавил key #1

Merged
L_DelOff merged 1 commits from add-key into master 2024-06-02 22:01:25 +03:00
2 changed files with 12 additions and 16 deletions

View File

@ -1,26 +1,17 @@
package ru.ldeloff.demokafkastream;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.*;
import org.apache.kafka.common.network.Mode;
import org.apache.logging.log4j.CloseableThreadContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.MessageSource;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.apache.kafka.common.serialization.StringSerializer;
import java.io.DataInput;
import java.io.IOException;
import java.io.Serializable;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -48,22 +39,25 @@ public class DemoKafkaStreamApplication {
// Продюсер (сам периодически отправляет сообщения)
@Bean
public Supplier<Model> producerAuto1() {
public Supplier<Message<Model>> producerAuto1() {
return () -> {
Model message = new Model(++i);
// System.out.println("Produce auto 1:" + message);
return message;
return MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.KEY, String.valueOf(message.getId()))
.build();
};
}
// Вручную отправим сообщение
@GetMapping(value = "/test1")
public String test1() {
Model message = new Model(++i);
System.out.println("Produce manual (kafka 1): " + message);
streamBridge.send("producerManual1-out-0", message);
streamBridge.send("producerManual1-out-0",
MessageBuilder.withPayload(message)
.setHeader(KafkaHeaders.KEY, String.valueOf(message.getId()))
.build());
return "test OK";
}

View File

@ -44,6 +44,8 @@ spring:
stream:
kafka:
binder:
producer-properties:
key.serializer: "org.apache.kafka.common.serialization.StringSerializer"
brokers:
- 192.168.10.101:29092