Skip to content

Instantly share code, notes, and snippets.

@careykevin
Last active December 19, 2019 16:32
Show Gist options
  • Save careykevin/20e63e9838cb68de10bd4358681c6930 to your computer and use it in GitHub Desktop.
Save careykevin/20e63e9838cb68de10bd4358681c6930 to your computer and use it in GitHub Desktop.
Spring Boot - Kafka Streams example
package com.careykevin.kafkastreamsdemo;
import com.careykevin.kafkastreamsdemo.util.StreamListenerService;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.streams.KeyValue.pair;
@RestController
@SpringBootApplication
@RequiredArgsConstructor
@EnableBinding(KafkaStreamsDemoApplication.Bindings.class)
public class KafkaStreamsDemoApplication {
// Input - Kafka topic containing Score objects, with playerId and score
private static final String INPUT_TOPIC_NAME = "scores";
// Output - Table with the total score for each player
private static final String OUTPUT_TABLE_NAME = "totalScoreByPlayerId";
private final StreamListenerService streamListenerService;
// Kafka Streams application to read input Scores and maintain table of <playerId><totalScore>
@StreamListener
public void scoresStreamListener(@Input(INPUT_TOPIC_NAME) KStream<?, Score> scores) {
scores
.map((key, score) -> pair(score.getPlayerId(), score.getScore()))
.groupByKey(Grouped.with(Serdes.Long(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(OUTPUT_TABLE_NAME));
}
// Provide REST API for totalScores by playerId, plain old Spring REST controller
@GetMapping("players/{playerId}")
public Map<String, Long> getPlayerTotalScore(@PathVariable long playerId) {
// Kafka Streams applications identified by method name in Spring
String streamsBuilder = "&stream-builder-KafkaStreamsDemoApplication-scoresStreamListener";
ReadOnlyKeyValueStore<Long, Long> table = streamListenerService.getTable(streamsBuilder, OUTPUT_TABLE_NAME);
return singletonMap("totalScore", table.get(playerId));
}
interface Bindings {
@Input(INPUT_TOPIC_NAME)
KStream<?, Score> scores();
}
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsDemoApplication.class, args);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment