Ручные отправщики сообщений

pull/1/head
L_DelOff 2024-06-01 13:17:57 +03:00
parent f7bde10866
commit 80080dbde1
5 changed files with 144 additions and 2 deletions

34
pom.xml
View File

@ -15,13 +15,32 @@
<description>DemoKafkaStream</description>
<properties>
<java.version>22</java.version>
<spring-cloud.version>2023.0.1</spring-cloud.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
@ -32,6 +51,19 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-binder</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-function-web</artifactId>
</dependency>
</dependencies>
<build>

View File

@ -1,13 +1,49 @@
package ru.ldeloff.demokafkastream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.function.Consumer;
@SpringBootApplication
public class DemoKafkaStreamApplication {
private int i = 0;
public static void main(String[] args) {
SpringApplication.run(DemoKafkaStreamApplication.class, args);
}
// Пуляю в кафку 1
@Bean
public Consumer<String> consumer1() {
return s -> System.out.println("Data Consumer 1 (Kafka-1):" + s);
}
// // В данном примере продюссер отправляет сообщения периодически
// @Bean
// public Supplier<String> producer1() {
// return () -> {
// System.out.println("Data Supplier 1 (Kafka-1):" + ++i);
// return "Data Supplier 1 (Kafka-1):" + i;
// };
// }
// Пуляю в кафку 2
@Bean
public Consumer<String> consumer2() {
return s -> System.out.println("Data Consumer 2 (Kafka-2):" + s);
}
// // В данном примере продюссер отправляет сообщения периодически
// @Bean
// public Supplier<String> producer2() {
// return () -> {
// System.out.println("Data Supplier 2 (Kafka-2):" + ++i);
// return "Data Supplier 2 (Kafka-2):" + i;
// };
// }
}

View File

@ -0,0 +1,30 @@
package ru.ldeloff.demokafkastream;
import lombok.RequiredArgsConstructor;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequiredArgsConstructor
public class controller {
private final StreamBridge streamBridge;
@GetMapping(value = "/test1")
public String test1() {
String str = "Produce manual (kafka 1) " + System.currentTimeMillis();
System.out.println(str);
streamBridge.send("producer1-out-0", str);
return "test OK";
}
@GetMapping(value = "/test2")
public String test2() {
String str = "Produce manual (kafka 2) " + System.currentTimeMillis();
System.out.println(str);
streamBridge.send("producer2-out-0", str);
return "test OK";
}
}

View File

@ -1 +0,0 @@
spring.application.name=DemoKafkaStream

View File

@ -0,0 +1,45 @@
spring:
application:
name: DemoKafkaStream
cloud:
function:
definition: producer1;consumer1;producer2;consumer2
stream:
bindings:
producer1-out-0:
binder: kafka-1
destination: kafka_1_topic
consumer1-in-0:
binder: kafka-1
destination: kafka_1_topic
producer2-out-0:
binder: kafka-2
destination: kafka_2_topic
consumer2-in-0:
binder: kafka-2
destination: kafka_2_topic
binders:
kafka-1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers:
- 192.168.10.101:29092
kafka-2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers:
- 192.168.10.101:39092