diff --git a/pom.xml b/pom.xml index 1a6441f..80a425a 100644 --- a/pom.xml +++ b/pom.xml @@ -15,13 +15,32 @@ DemoKafkaStream 22 + 2023.0.1 + + + + org.springframework.cloud + spring-cloud-dependencies + ${spring-cloud.version} + pom + import + + + org.springframework.boot spring-boot-starter - + + org.springframework.cloud + spring-cloud-stream + + + org.springframework.cloud + spring-cloud-starter-stream-kafka + org.projectlombok lombok @@ -32,6 +51,19 @@ spring-boot-starter-test test + + org.springframework.cloud + spring-cloud-stream-test-binder + test + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.cloud + spring-cloud-function-web + diff --git a/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java b/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java index 10568fb..ba2e898 100644 --- a/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java +++ b/src/main/java/ru/ldeloff/demokafkastream/DemoKafkaStreamApplication.java @@ -1,13 +1,49 @@ package ru.ldeloff.demokafkastream; + import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +import java.util.function.Consumer; @SpringBootApplication public class DemoKafkaStreamApplication { + private int i = 0; + public static void main(String[] args) { SpringApplication.run(DemoKafkaStreamApplication.class, args); } + // Пуляю в кафку 1 + @Bean + public Consumer consumer1() { + return s -> System.out.println("Data Consumer 1 (Kafka-1):" + s); + } + +// // В данном примере продюссер отправляет сообщения периодически +// @Bean +// public Supplier producer1() { +// return () -> { +// System.out.println("Data Supplier 1 (Kafka-1):" + ++i); +// return "Data Supplier 1 (Kafka-1):" + i; +// }; +// } + + // Пуляю в кафку 2 + @Bean + public Consumer consumer2() { + return s -> System.out.println("Data Consumer 2 (Kafka-2):" + s); + } + +// // В данном примере продюссер отправляет сообщения периодически +// @Bean +// public Supplier producer2() { +// return () -> { +// System.out.println("Data Supplier 2 (Kafka-2):" + ++i); +// return "Data Supplier 2 (Kafka-2):" + i; +// }; +// } + } diff --git a/src/main/java/ru/ldeloff/demokafkastream/controller.java b/src/main/java/ru/ldeloff/demokafkastream/controller.java new file mode 100644 index 0000000..3a6fec4 --- /dev/null +++ b/src/main/java/ru/ldeloff/demokafkastream/controller.java @@ -0,0 +1,30 @@ +package ru.ldeloff.demokafkastream; + +import lombok.RequiredArgsConstructor; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + + +@RestController +@RequiredArgsConstructor +public class controller { + + private final StreamBridge streamBridge; + + @GetMapping(value = "/test1") + public String test1() { + String str = "Produce manual (kafka 1) " + System.currentTimeMillis(); + System.out.println(str); + streamBridge.send("producer1-out-0", str); + return "test OK"; + } + + @GetMapping(value = "/test2") + public String test2() { + String str = "Produce manual (kafka 2) " + System.currentTimeMillis(); + System.out.println(str); + streamBridge.send("producer2-out-0", str); + return "test OK"; + } +} \ No newline at end of file diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties deleted file mode 100644 index 7237ad1..0000000 --- a/src/main/resources/application.properties +++ /dev/null @@ -1 +0,0 @@ -spring.application.name=DemoKafkaStream diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..fe594a0 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,45 @@ +spring: + application: + name: DemoKafkaStream + + cloud: + function: + definition: producer1;consumer1;producer2;consumer2 + stream: + bindings: + producer1-out-0: + binder: kafka-1 + destination: kafka_1_topic + consumer1-in-0: + binder: kafka-1 + destination: kafka_1_topic + + producer2-out-0: + binder: kafka-2 + destination: kafka_2_topic + consumer2-in-0: + binder: kafka-2 + destination: kafka_2_topic + + binders: + kafka-1: + type: kafka + environment: + spring: + cloud: + stream: + kafka: + binder: + brokers: + - 192.168.10.101:29092 + kafka-2: + type: kafka + environment: + spring: + cloud: + stream: + kafka: + binder: + brokers: + - 192.168.10.101:39092 +