Created
July 18, 2018 21:45
-
-
Save garyrussell/1f6969cc52dd95379153dcc26f2edd84 to your computer and use it in GitHub Desktop.
So51407542Application
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spring.kafka.consumer.auto-offset-reset=earliest | |
spring.kafka.consumer.enable-auto-commit=false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
foo:bar | |
foo:baz | |
a:bar | |
b:baz | |
a:bar | |
b:baz | |
ConsumerRecord(topic = topicFinal.c, partition = 0, offset = 0, CreateTime = 1531949935167, serialized key size = 3, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = foo, value = barbaz) | |
ConsumerRecord(topic = topicFinal.c, partition = 0, offset = 1, CreateTime = 1531949935180, serialized key size = 3, serialized value size = 6, headers = RecordHeaders(headers = [], isReadOnly = false), key = foo, value = barbaz) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>com.example</groupId> | |
<artifactId>so51407542</artifactId> | |
<version>0.0.1-SNAPSHOT</version> | |
<packaging>jar</packaging> | |
<name>so51407542</name> | |
<description>Demo project for Spring Boot</description> | |
<parent> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-parent</artifactId> | |
<version>2.0.3.RELEASE</version> | |
<relativePath/> <!-- lookup parent from repository --> | |
</parent> | |
<properties> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> | |
<java.version>1.8</java.version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.kafka</groupId> | |
<artifactId>spring-kafka</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.kafka</groupId> | |
<artifactId>kafka-streams</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-maven-plugin</artifactId> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example; | |
import java.util.Properties; | |
import org.apache.kafka.clients.admin.NewTopic; | |
import org.apache.kafka.clients.consumer.ConsumerRecord; | |
import org.apache.kafka.common.serialization.Serdes.StringSerde; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.KeyValue; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.kstream.KStreamBuilder; | |
import org.apache.kafka.streams.kstream.KTable; | |
import org.springframework.boot.ApplicationRunner; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.kafka.annotation.KafkaListener; | |
import org.springframework.kafka.core.KafkaTemplate; | |
@SpringBootApplication | |
public class So51407542Application { | |
private static final String SUFFIX = "c"; | |
private static final String TOPIC_A1 = "topicA1" + SUFFIX; | |
private static final String TOPIC_A2 = "topicA2" + SUFFIX; | |
private static final String TOPIC_B1 = "topicB1" + SUFFIX; | |
private static final String TOPIC_B2 = "topicB2" + SUFFIX; | |
private static final String TOPIC_FINAL = "topicFinal." + SUFFIX; | |
public static void main(String[] args) { | |
SpringApplication.run(So51407542Application.class, args).close(); | |
} | |
@Bean | |
public ApplicationRunner runner(KafkaTemplate<String, String> template) { | |
return args -> { | |
template.send(TOPIC_A1, 0, "foo", "bar"); | |
template.send(TOPIC_B1, 0, "foo", "baz"); | |
KStreamBuilder kstreamBuilder = new KStreamBuilder(); | |
StringSerde serde = new StringSerde(); | |
KTable<String, String> kTableA = kstreamBuilder.table(serde, serde, TOPIC_A2); | |
kstreamBuilder.stream(serde, serde, TOPIC_A1) | |
.map((k, v) -> { | |
System.out.println(k + ":" + v); | |
return new KeyValue<>(k, v); | |
}) | |
.to(serde, serde, TOPIC_A2); | |
kstreamBuilder.stream(serde, serde, TOPIC_B1) | |
.map((k, v) -> { | |
System.out.println(k + ":" + v); | |
return new KeyValue<>(k, v); | |
}) | |
.to(serde, serde, TOPIC_B2); | |
KTable<String, String> kTableB = kstreamBuilder.table(serde, serde, TOPIC_B2); | |
KTable<String, String> resultTable = kTableA.leftJoin(kTableB, (a, b) -> { | |
System.out.println("a:" + a); | |
System.out.println("b:" + b); | |
return a + b; | |
}); | |
resultTable.to(serde, serde, TOPIC_FINAL); | |
Properties configs = new Properties(); | |
configs.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); | |
configs.put(StreamsConfig.APPLICATION_ID_CONFIG, "So51407542Application"); | |
KafkaStreams ks = new KafkaStreams(kstreamBuilder, configs); | |
ks.start(); | |
Thread.sleep(10_000); | |
}; | |
} | |
@KafkaListener(id = "foo", topics = TOPIC_FINAL) | |
public void in(ConsumerRecord<?, ?> in) { | |
System.out.println(in); | |
} | |
@Bean | |
public NewTopic topicA1() { | |
return new NewTopic(TOPIC_A1, 1, (short) 1); | |
} | |
@Bean | |
public NewTopic topicA2() { | |
return new NewTopic(TOPIC_A2, 1, (short) 1); | |
} | |
@Bean | |
public NewTopic topicB1() { | |
return new NewTopic(TOPIC_B1, 1, (short) 1); | |
} | |
@Bean | |
public NewTopic topicB2() { | |
return new NewTopic(TOPIC_B2, 1, (short) 1); | |
} | |
@Bean | |
public NewTopic topicFinal() { | |
return new NewTopic(TOPIC_FINAL, 1, (short) 1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment