|
|
|
|
@@ -1,26 +1,17 @@
|
|
|
|
|
package ru.ldeloff.demokafkastream;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonParser;
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
|
|
import lombok.*;
|
|
|
|
|
import org.apache.kafka.common.network.Mode;
|
|
|
|
|
import org.apache.logging.log4j.CloseableThreadContext;
|
|
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
|
import org.springframework.boot.SpringApplication;
|
|
|
|
|
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
|
|
|
|
import org.springframework.cloud.stream.function.StreamBridge;
|
|
|
|
|
import org.springframework.context.MessageSource;
|
|
|
|
|
import org.springframework.context.annotation.Bean;
|
|
|
|
|
import org.springframework.kafka.support.KafkaHeaders;
|
|
|
|
|
import org.springframework.messaging.Message;
|
|
|
|
|
import org.springframework.messaging.support.MessageBuilder;
|
|
|
|
|
import org.springframework.web.bind.annotation.GetMapping;
|
|
|
|
|
import org.springframework.web.bind.annotation.RestController;
|
|
|
|
|
import org.apache.kafka.common.serialization.StringSerializer;
|
|
|
|
|
|
|
|
|
|
import java.io.DataInput;
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.io.Serializable;
|
|
|
|
|
import java.util.Objects;
|
|
|
|
|
import java.util.function.Consumer;
|
|
|
|
|
import java.util.function.Supplier;
|
|
|
|
|
|
|
|
|
|
@@ -41,29 +32,32 @@ public class DemoKafkaStreamApplication {
|
|
|
|
|
// Консьюмер
|
|
|
|
|
@Bean
|
|
|
|
|
public Consumer<Model> consumerAuto1() {
|
|
|
|
|
return message -> {;
|
|
|
|
|
return message -> {
|
|
|
|
|
// System.out.println("Consume auto 1: " + message);
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Продюсер (сам периодически отправляет сообщения)
|
|
|
|
|
@Bean
|
|
|
|
|
public Supplier<Model> producerAuto1() {
|
|
|
|
|
public Supplier<Message<Model>> producerAuto1() {
|
|
|
|
|
return () -> {
|
|
|
|
|
Model message = new Model(++i);
|
|
|
|
|
// System.out.println("Produce auto 1:" + message);
|
|
|
|
|
return message;
|
|
|
|
|
return MessageBuilder.withPayload(message)
|
|
|
|
|
.setHeader(KafkaHeaders.KEY, String.valueOf(message.getId()))
|
|
|
|
|
.build();
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Вручную отправим сообщение
|
|
|
|
|
@GetMapping(value = "/test1")
|
|
|
|
|
public String test1() {
|
|
|
|
|
Model message = new Model(++i);
|
|
|
|
|
System.out.println("Produce manual (kafka 1): " + message);
|
|
|
|
|
streamBridge.send("producerManual1-out-0", message);
|
|
|
|
|
streamBridge.send("producerManual1-out-0",
|
|
|
|
|
MessageBuilder.withPayload(message)
|
|
|
|
|
.setHeader(KafkaHeaders.KEY, String.valueOf(message.getId()))
|
|
|
|
|
.build());
|
|
|
|
|
return "test OK";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|