diff --git a/KafkaStream/pom.xml b/KafkaStream/pom.xml new file mode 100644 index 0000000..1b30429 --- /dev/null +++ b/KafkaStream/pom.xml @@ -0,0 +1,102 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 3.3.0 + + + ru.ldeloff + KafkaStream + 0.0.1-SNAPSHOT + KafkaStream + KafkaStream + + 22 + 2023.0.1 + + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + + + + org.springframework.boot + spring-boot-starter + + + org.springframework.cloud + spring-cloud-stream + + + org.springframework.cloud + spring-cloud-starter-stream-kafka + + + org.springframework.boot + spring-boot-starter-webflux + + + io.github.resilience4j + resilience4j-spring-boot2 + + + org.springframework.boot + spring-boot-starter-actuator + + + org.springframework.boot + spring-boot-starter-aop + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + org.springframework.cloud + spring-cloud-stream-test-binder + test + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.cloud + spring-cloud-function-web + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + + diff --git a/KafkaStream/src/main/java/ru/ldeloff/kafkastream/KafkaStreamApplication.java b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/KafkaStreamApplication.java new file mode 100644 index 0000000..7f41b70 --- /dev/null +++ b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/KafkaStreamApplication.java @@ -0,0 +1,13 @@ +package ru.ldeloff.kafkastream; + + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + + +@SpringBootApplication +public class KafkaStreamApplication { + public static void main(String[] args) { + SpringApplication.run(KafkaStreamApplication.class, args); + } +} diff --git a/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/CircuitBreakerConfig.java b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/CircuitBreakerConfig.java new file mode 100644 index 0000000..921a787 --- /dev/null +++ b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/CircuitBreakerConfig.java @@ -0,0 +1,18 @@ +package ru.ldeloff.kafkastream.config; + +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.CircuitBreakerRegistry; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class CircuitBreakerConfig { + private final CircuitBreakerRegistry circuitBreakerRegistry; + + public CircuitBreakerConfig(CircuitBreakerRegistry circuitBreakerRegistry){ + this.circuitBreakerRegistry = circuitBreakerRegistry; + } + + public CircuitBreaker circuitBreaker(String cbName) { + return circuitBreakerRegistry.circuitBreaker(cbName); + } +} diff --git a/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/CircuitBreakerImpl.java b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/CircuitBreakerImpl.java new file mode 100644 index 0000000..f6f663b --- /dev/null +++ b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/CircuitBreakerImpl.java @@ -0,0 +1,38 @@ +package ru.ldeloff.kafkastream.config; + +import io.github.resilience4j.circuitbreaker.CircuitBreaker; +import io.github.resilience4j.circuitbreaker.event.CircuitBreakerOnStateTransitionEvent; +import org.springframework.cloud.stream.binding.BindingsLifecycleController; +import org.springframework.cloud.stream.endpoint.BindingsEndpoint; +import org.springframework.context.annotation.Configuration; + +@Configuration +public class CircuitBreakerImpl { + private final CircuitBreaker circuitBreaker; + private final BindingsEndpoint bindingsEndpoint; + + protected CircuitBreakerImpl(BindingsEndpoint bindingsEndpoint, + CircuitBreakerConfig circuitBreakerConfig) { + this.bindingsEndpoint = bindingsEndpoint; + this.circuitBreaker = circuitBreakerConfig.circuitBreaker("consumerCircuitBreaker"); + + circuitBreaker.getEventPublisher().onStateTransition(this::onStateChangeEvent); + } + + private void onStateChangeEvent(CircuitBreakerOnStateTransitionEvent event) { + System.out.println("WARNING"); + + switch (event.getStateTransition().getToState()) { + case OPEN: + System.out.println("consumerAutoTestCircuitBreaker1 open"); + bindingsEndpoint.changeState("consumerAutoTestCircuitBreaker1-in-0", BindingsLifecycleController.State.STOPPED); + break; + case CLOSED: + System.out.println("consumerAutoTestCircuitBreaker1 closed"); + case HALF_OPEN: + System.out.println("consumerAutoTestCircuitBreaker1 half_open"); + bindingsEndpoint.changeState("consumerAutoTestCircuitBreaker1-in-0", BindingsLifecycleController.State.STARTED); + break; + } + } +} \ No newline at end of file diff --git a/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/KafkaConsumerConfig.java b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/KafkaConsumerConfig.java new file mode 100644 index 0000000..c8f56ab --- /dev/null +++ b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/KafkaConsumerConfig.java @@ -0,0 +1,56 @@ +package ru.ldeloff.kafkastream.config; + +import lombok.RequiredArgsConstructor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import ru.ldeloff.kafkastream.model.Model; +import ru.ldeloff.kafkastream.service.ModelService; + +import java.util.function.Consumer; + +@Configuration +@RequiredArgsConstructor +public class KafkaConsumerConfig { + private final ModelService modelService; + + @Bean + public Consumer consumerAuto1() { + return message -> { + // System.out.println("Consume auto 1: " + message); + }; + } + + @Bean + public Consumer consumerManual1() { + return message -> { + System.out.println("Consume manual 1: " + message); + }; + } + + @Bean + public Consumer consumerAutoTestCircuitBreaker1() { + return message -> { + System.out.println("Consume Auto CB 1: " + message); + try { + modelService.sendModel(message); + } catch (Exception e) { + System.out.println(e.getClass()); + } + + }; + } + + @Bean + public Consumer consumerAuto2() { + return message -> { + // System.out.println("Consume auto 2: " + message); + }; + } + + @Bean + public Consumer consumerManual2() { + return message -> { + System.out.println("Consume manual 2: " + message); + }; + } +} diff --git a/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/KafkaProducerConfig.java b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/KafkaProducerConfig.java new file mode 100644 index 0000000..1dedd8a --- /dev/null +++ b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/KafkaProducerConfig.java @@ -0,0 +1,68 @@ +package ru.ldeloff.kafkastream.config; + +import lombok.RequiredArgsConstructor; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import ru.ldeloff.kafkastream.model.Model; + +import java.util.function.Supplier; + +@Configuration +@RequiredArgsConstructor +public class KafkaProducerConfig { + + private int i = 0; + private final StreamBridge streamBridge; + + // Периодически отправляет сообщения + @Bean + public Supplier> producerAuto1() { + return () -> { + Model message = new Model(++i); + // System.out.println("Produce auto 1:" + message); + return MessageBuilder.withPayload(message) + .setHeader(KafkaHeaders.KEY, String.valueOf(message.getId())) + .build(); + }; + } + + // Ручная отправка + public void sendMessage1(Model message) { + System.out.println("Produce manual (kafka 1): " + message); + streamBridge.send("producerManual1-out-0", + MessageBuilder.withPayload(message) + .setHeader(KafkaHeaders.KEY, String.valueOf(message.getId())) + .build()); + } + + // Для Circuit Breaker + @Bean + public Supplier> producerAutoTestCircuitBreaker1() { + return () -> { + Model message = new Model(++i); + System.out.println("Produce auto (CB) 1:" + message); + return MessageBuilder.withPayload(message) + .setHeader(KafkaHeaders.KEY, String.valueOf(message.getId())) + .build(); + }; + } + + /* Кафка 2 */ + @Bean + public Supplier producerAuto2() { + return () -> { + Model message = new Model(++i); + // System.out.println("Produce auto 2:" + message); + return message; + }; + } + + public void sendMessage2(Model message) { + System.out.println("Produce manual (kafka 2): " + message); + streamBridge.send("producerManual2-out-0", message); + } +} diff --git a/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/KafkaStreamConfig.java b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/KafkaStreamConfig.java new file mode 100644 index 0000000..57723f6 --- /dev/null +++ b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/KafkaStreamConfig.java @@ -0,0 +1,20 @@ +package ru.ldeloff.kafkastream.config; + +import org.springframework.cloud.stream.binding.BindingsLifecycleController; +import org.springframework.cloud.stream.binding.InputBindingLifecycle; +import org.springframework.cloud.stream.binding.OutputBindingLifecycle; +import org.springframework.cloud.stream.endpoint.BindingsEndpoint; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.List; + +@Configuration +public class KafkaStreamConfig { + + @Bean + public BindingsEndpoint bindingsEndpoint(List inputBindings, + List outputBindings) { + return new BindingsEndpoint(new BindingsLifecycleController(inputBindings, outputBindings)); + } +} diff --git a/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/WebConfig.java b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/WebConfig.java new file mode 100644 index 0000000..97f4fa4 --- /dev/null +++ b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/config/WebConfig.java @@ -0,0 +1,28 @@ +package ru.ldeloff.kafkastream.config; + +import io.netty.channel.ChannelOption; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpHeaders; +import org.springframework.http.MediaType; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.netty.http.client.HttpClient; + +@Configuration +public class WebConfig { + + @Bean + public WebClient webClient() { + return WebClient.builder() + .baseUrl("http://localhost:3000") + .clientConnector(new ReactorClientHttpConnector( + HttpClient.create() + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100) + .wiretap(true) + ) + ) + .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) + .build(); + } +} \ No newline at end of file diff --git a/KafkaStream/src/main/java/ru/ldeloff/kafkastream/controller/Controller.java b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/controller/Controller.java new file mode 100644 index 0000000..b13001d --- /dev/null +++ b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/controller/Controller.java @@ -0,0 +1,41 @@ +package ru.ldeloff.kafkastream.controller; + +import lombok.RequiredArgsConstructor; +import org.springframework.cloud.stream.binding.BindingsLifecycleController; +import org.springframework.cloud.stream.endpoint.BindingsEndpoint; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import ru.ldeloff.kafkastream.config.KafkaProducerConfig; +import ru.ldeloff.kafkastream.model.Model; + +@RestController +@RequiredArgsConstructor +public class Controller { + int i = 0; + private final KafkaProducerConfig kafkaProducerConfig; + private final BindingsEndpoint bindingsEndpoint; + + @GetMapping(value = "/test1") + public String test1() { + kafkaProducerConfig.sendMessage1(new Model(++i)); + return "test OK, i = " + i; + } + + @GetMapping(value = "/test2") + public String test2() { + kafkaProducerConfig.sendMessage2(new Model(++i)); + return "test OK, i = " + i; + } + + @GetMapping(value = "/test3") + public String test3() { + bindingsEndpoint.changeState("consumerAutoTestCircuitBreaker1-in-0", BindingsLifecycleController.State.STOPPED); + return "test OK"; + } + + @GetMapping(value = "/test4") + public String test4() { + bindingsEndpoint.changeState("consumerAutoTestCircuitBreaker1-in-0", BindingsLifecycleController.State.STARTED); + return "test OK"; + } +} diff --git a/KafkaStream/src/main/java/ru/ldeloff/kafkastream/model/Model.java b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/model/Model.java new file mode 100644 index 0000000..f94f88c --- /dev/null +++ b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/model/Model.java @@ -0,0 +1,19 @@ +package ru.ldeloff.kafkastream.model; + +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import lombok.ToString; + +@Getter +@Setter +@ToString +@NoArgsConstructor +public class Model { + private int id; + private long time; + public Model(int id) { + this.id = id; + this.time = System.currentTimeMillis(); + } +} diff --git a/KafkaStream/src/main/java/ru/ldeloff/kafkastream/service/ModelService.java b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/service/ModelService.java new file mode 100644 index 0000000..fb50a9c --- /dev/null +++ b/KafkaStream/src/main/java/ru/ldeloff/kafkastream/service/ModelService.java @@ -0,0 +1,27 @@ +package ru.ldeloff.kafkastream.service; + +import io.github.resilience4j.circuitbreaker.annotation.CircuitBreaker; +import io.github.resilience4j.retry.annotation.Retry; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Service; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; +import ru.ldeloff.kafkastream.model.Model; + +@Service +@RequiredArgsConstructor +public class ModelService { + private final WebClient webClient; + + @CircuitBreaker(name = "consumerCircuitBreaker") + @Retry(name = "consumerCircuitBreaker") + public void sendModel(Model model) { + System.out.println("Sending Model: " + model); + Model model1 = webClient.post() + .uri("/models") + .bodyValue(BodyInserters.fromValue(model)) + .retrieve() + .bodyToMono(Model.class).block(); + System.out.println(model1); + } +} diff --git a/src/main/resources/application.yml b/KafkaStream/src/main/resources/application.yml similarity index 52% rename from src/main/resources/application.yml rename to KafkaStream/src/main/resources/application.yml index c849bf9..66802c6 100644 --- a/src/main/resources/application.yml +++ b/KafkaStream/src/main/resources/application.yml @@ -1,10 +1,12 @@ spring: application: name: DemoKafkaStream + main: + allow-bean-definition-overriding: true cloud: function: - definition: producerAuto1;consumerAuto1;producerManual1;consumerManual1;producerAuto2;consumerAuto2;producerManual2;consumerManual2 + definition: producerAuto1;consumerAuto1;producerManual1;consumerManual1;producerAuto2;consumerAuto2;producerManual2;consumerManual2;producerAutoTestCircuitBreaker1;consumerAutoTestCircuitBreaker1 stream: bindings: producerAuto1-out-0: @@ -21,6 +23,14 @@ spring: binder: kafka-1 destination: kafka_1_topic_manual + producerAutoTestCircuitBreaker1-out-0: + binder: kafka-1 + destination: kafka_1_topic_auto_CB + consumerAutoTestCircuitBreaker1-in-0: + binder: kafka-1 + destination: kafka_1_topic_auto_CB + group: consumerAutoTestCircuitBreaker + producerAuto2-out-0: binder: kafka-2 destination: kafka_2_topic_auto @@ -59,3 +69,50 @@ spring: binder: brokers: - 192.168.10.101:39092 + +#Resilience4j circuit breaker config +resilience4j.circuitbreaker: + instances: + consumerCircuitBreaker: + slidingWindowType: COUNT_BASED + minimumNumberOfCalls: 1 + permittedNumberOfCallsInHalfOpenState: 10 + failureRateThreshold: 50 + automaticTransitionFromOpenToHalfOpenEnabled: true + slidingWindowSize: 10 + waitDurationInOpenState: 10s + registerHealthIndicator: true + recordExceptions: org.springframework.web.reactive.function.client.WebClientRequestException + ignoreExceptions: + +#Resilience4j retry config +resilience4j.retry: + instances: + consumerCircuitBreaker: + maxAttempts: 3 + waitDuration: 500 + retryExceptions: org.springframework.web.reactive.function.client.WebClientRequestException + ignoreExceptions: + metrics: + enabled: true + legacy: + enabled: true + +management: + endpoints: + web: + exposure: + include: bindings,health + + health: + binders: + enabled: true + circuitbreakers: + enabled: true + ratelimiters: + enabled:true + + endpoint: + health: + show-details: ALWAYS + diff --git a/src/test/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplicationTests.java b/KafkaStream/src/test/java/ru/ldeloff/kafkastream/KafkaStreamApplicationTests.java similarity index 67% rename from src/test/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplicationTests.java rename to KafkaStream/src/test/java/ru/ldeloff/kafkastream/KafkaStreamApplicationTests.java index 970df1f..704b355 100644 --- a/src/test/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplicationTests.java +++ b/KafkaStream/src/test/java/ru/ldeloff/kafkastream/KafkaStreamApplicationTests.java @@ -1,10 +1,10 @@ -package ru.ldeloff.demokafkastream; +package ru.ldeloff.kafkastream; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest -class DemoKafkaStreamApplicationTests { +class KafkaStreamApplicationTests { @Test void contextLoads() { diff --git a/Server/pom.xml b/Server/pom.xml new file mode 100644 index 0000000..deed2cf --- /dev/null +++ b/Server/pom.xml @@ -0,0 +1,59 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 3.3.0 + + + ru.ldeloff + Server + 0.0.1-SNAPSHOT + Server + Server + + 17 + + + + org.springframework.boot + spring-boot-starter-web + + + + org.springframework.boot + spring-boot-configuration-processor + true + + + org.projectlombok + lombok + true + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.projectlombok + lombok + + + + + + + + diff --git a/Server/src/main/java/ru/ldeloff/server/ServerApplication.java b/Server/src/main/java/ru/ldeloff/server/ServerApplication.java new file mode 100644 index 0000000..11e75a6 --- /dev/null +++ b/Server/src/main/java/ru/ldeloff/server/ServerApplication.java @@ -0,0 +1,13 @@ +package ru.ldeloff.server; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class ServerApplication { + + public static void main(String[] args) { + SpringApplication.run(ServerApplication.class, args); + } + +} diff --git a/Server/src/main/java/ru/ldeloff/server/controller/ModelController.java b/Server/src/main/java/ru/ldeloff/server/controller/ModelController.java new file mode 100644 index 0000000..8692f4e --- /dev/null +++ b/Server/src/main/java/ru/ldeloff/server/controller/ModelController.java @@ -0,0 +1,15 @@ +package ru.ldeloff.server.controller; + +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RestController; +import ru.ldeloff.server.model.Model; + +@RestController +public class ModelController { + @PostMapping(value = "/models") + public Model test2(Model model) { + int id = model.getId(); + model.setId(id + 1000000); + return model; + } +} diff --git a/src/main/java/ru/ldeloff/demokafkastream/Model.java b/Server/src/main/java/ru/ldeloff/server/model/Model.java similarity index 89% rename from src/main/java/ru/ldeloff/demokafkastream/Model.java rename to Server/src/main/java/ru/ldeloff/server/model/Model.java index c4fa218..c5cb303 100644 --- a/src/main/java/ru/ldeloff/demokafkastream/Model.java +++ b/Server/src/main/java/ru/ldeloff/server/model/Model.java @@ -1,4 +1,4 @@ -package ru.ldeloff.demokafkastream; +package ru.ldeloff.server.model; import lombok.Getter; import lombok.NoArgsConstructor; diff --git a/Server/src/main/resources/application.yml b/Server/src/main/resources/application.yml new file mode 100644 index 0000000..ffb414c --- /dev/null +++ b/Server/src/main/resources/application.yml @@ -0,0 +1,6 @@ +server: + port: 3000 +spring: + application: + name: Server + diff --git a/Server/src/test/java/ru/ldeloff/server/ServerApplicationTests.java b/Server/src/test/java/ru/ldeloff/server/ServerApplicationTests.java new file mode 100644 index 0000000..5cad970 --- /dev/null +++ b/Server/src/test/java/ru/ldeloff/server/ServerApplicationTests.java @@ -0,0 +1,13 @@ +package ru.ldeloff.server; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class ServerApplicationTests { + + @Test + void contextLoads() { + } + +} diff --git a/pom.xml b/pom.xml index 80a425a..53bffc7 100644 --- a/pom.xml +++ b/pom.xml @@ -13,74 +13,5 @@ 0.0.1-SNAPSHOT DemoKafkaStream DemoKafkaStream - - 22 - 2023.0.1 - - - - - org.springframework.cloud - spring-cloud-dependencies - ${spring-cloud.version} - pom - import - - - - - - org.springframework.boot - spring-boot-starter - - - org.springframework.cloud - spring-cloud-stream - - - org.springframework.cloud - spring-cloud-starter-stream-kafka - - - org.projectlombok - lombok - true - - - org.springframework.boot - spring-boot-starter-test - test - - - org.springframework.cloud - spring-cloud-stream-test-binder - test - - - org.springframework.boot - spring-boot-starter-web - - - org.springframework.cloud - spring-cloud-function-web - - - - - - - org.springframework.boot - spring-boot-maven-plugin - - - - org.projectlombok - lombok - - - - - - diff --git a/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java b/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java deleted file mode 100644 index 95055e2..0000000 --- a/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java +++ /dev/null @@ -1,108 +0,0 @@ -package ru.ldeloff.demokafkastream; - - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.cloud.stream.function.StreamBridge; -import org.springframework.context.annotation.Bean; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.messaging.Message; -import org.springframework.messaging.support.MessageBuilder; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.util.function.Consumer; -import java.util.function.Supplier; - -@RestController -@SpringBootApplication -public class DemoKafkaStreamApplication { - - private int i = 0; - - @Autowired - private StreamBridge streamBridge; - - public static void main(String[] args) { - SpringApplication.run(DemoKafkaStreamApplication.class, args); - } - - /* Кафка 1 */ - // Консьюмер - @Bean - public Consumer consumerAuto1() { - return message -> { - // System.out.println("Consume auto 1: " + message); - }; - } - - // Продюсер (сам периодически отправляет сообщения) - @Bean - public Supplier> producerAuto1() { - return () -> { - Model message = new Model(++i); - // System.out.println("Produce auto 1:" + message); - return MessageBuilder.withPayload(message) - .setHeader(KafkaHeaders.KEY, String.valueOf(message.getId())) - .build(); - }; - } - - // Вручную отправим сообщение - @GetMapping(value = "/test1") - public String test1() { - Model message = new Model(++i); - System.out.println("Produce manual (kafka 1): " + message); - streamBridge.send("producerManual1-out-0", - MessageBuilder.withPayload(message) - .setHeader(KafkaHeaders.KEY, String.valueOf(message.getId())) - .build()); - return "test OK"; - } - - // Консьюмер - @Bean - public Consumer consumerManual1() { - return message -> { - System.out.println("Consume manual 1: " + message); - }; - } - - - /* Кафка 2 */ - // Продюсер (сам периодически отправляет сообщения) - @Bean - public Supplier producerAuto2() { - return () -> { - Model message = new Model(++i); - // System.out.println("Produce auto 2:" + message); - return message; - }; - } - - // Консьюмер - @Bean - public Consumer consumerAuto2() { - return message -> { - // System.out.println("Consume auto 2: " + message); - }; - } - - // Вручную отправим сообщение - @GetMapping(value = "/test2") - public String test2() { - Model message = new Model(++i); - System.out.println("Produce manual (kafka 2): " + message); - streamBridge.send("producerManual2-out-0", message); - return "test OK"; - } - - // Консьюмер - @Bean - public Consumer consumerManual2() { - return message -> { - System.out.println("Consume manual 2: " + message); - }; - } -}