Skip to content

Instantly share code, notes, and snippets.

@garyrussell
Created July 18, 2018 21:45
Show Gist options
  • Save garyrussell/1f6969cc52dd95379153dcc26f2edd84 to your computer and use it in GitHub Desktop.
Save garyrussell/1f6969cc52dd95379153dcc26f2edd84 to your computer and use it in GitHub Desktop.
So51407542Application
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
foo:bar
foo:baz
a:bar
b:baz
a:bar
b:baz
ConsumerRecord(topic = topicFinal.c, partition = 0, offset = 0, CreateTime = 1531949935167, serialized key size = 3, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = foo, value = barbaz)
ConsumerRecord(topic = topicFinal.c, partition = 0, offset = 1, CreateTime = 1531949935180, serialized key size = 3, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = foo, value = barbaz)
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>so51407542</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>so51407542</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
package com.example;
import java.util.Properties;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class So51407542Application {
private static final String SUFFIX = "c";
private static final String TOPIC_A1 = "topicA1" + SUFFIX;
private static final String TOPIC_A2 = "topicA2" + SUFFIX;
private static final String TOPIC_B1 = "topicB1" + SUFFIX;
private static final String TOPIC_B2 = "topicB2" + SUFFIX;
private static final String TOPIC_FINAL = "topicFinal." + SUFFIX;
public static void main(String[] args) {
SpringApplication.run(So51407542Application.class, args).close();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send(TOPIC_A1, 0, "foo", "bar");
template.send(TOPIC_B1, 0, "foo", "baz");
KStreamBuilder kstreamBuilder = new KStreamBuilder();
StringSerde serde = new StringSerde();
KTable<String, String> kTableA = kstreamBuilder.table(serde, serde, TOPIC_A2);
kstreamBuilder.stream(serde, serde, TOPIC_A1)
.map((k, v) -> {
System.out.println(k + ":" + v);
return new KeyValue<>(k, v);
})
.to(serde, serde, TOPIC_A2);
kstreamBuilder.stream(serde, serde, TOPIC_B1)
.map((k, v) -> {
System.out.println(k + ":" + v);
return new KeyValue<>(k, v);
})
.to(serde, serde, TOPIC_B2);
KTable<String, String> kTableB = kstreamBuilder.table(serde, serde, TOPIC_B2);
KTable<String, String> resultTable = kTableA.leftJoin(kTableB, (a, b) -> {
System.out.println("a:" + a);
System.out.println("b:" + b);
return a + b;
});
resultTable.to(serde, serde, TOPIC_FINAL);
Properties configs = new Properties();
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "So51407542Application");
KafkaStreams ks = new KafkaStreams(kstreamBuilder, configs);
ks.start();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "foo", topics = TOPIC_FINAL)
public void in(ConsumerRecord<?, ?> in) {
System.out.println(in);
}
@Bean
public NewTopic topicA1() {
return new NewTopic(TOPIC_A1, 1, (short) 1);
}
@Bean
public NewTopic topicA2() {
return new NewTopic(TOPIC_A2, 1, (short) 1);
}
@Bean
public NewTopic topicB1() {
return new NewTopic(TOPIC_B1, 1, (short) 1);
}
@Bean
public NewTopic topicB2() {
return new NewTopic(TOPIC_B2, 1, (short) 1);
}
@Bean
public NewTopic topicFinal() {
return new NewTopic(TOPIC_FINAL, 1, (short) 1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment