Skip to content

Instantly share code, notes, and snippets.

@sheelprabhakar
Last active March 9, 2021 15:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sheelprabhakar/15abe891f5be7c583107af8ff7b7c964 to your computer and use it in GitHub Desktop.
Save sheelprabhakar/15abe891f5be7c583107af8ff7b7c964 to your computer and use it in GitHub Desktop.
Kafka springboot stream config
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamConfig {
@Value("${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value("${kafka.groupId}")
private String groupId;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration streamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapAddress);
props.put(
ConsumerConfig.GROUP_ID_CONFIG,
groupId);
props.put(
StreamsConfig.APPLICATION_ID_CONFIG,
"stream-1-weather");
JsonSerde<WeatherInfo> jsonSerde = new JsonSerde<>(WeatherInfo.class);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass());
return new KafkaStreamsConfiguration(props);
}
@Bean
public KStream<String, WeatherInfo> kStreamJson(StreamsBuilder builder) {
JsonDeserializer<WeatherInfo> deserializer = new JsonDeserializer<>(WeatherInfo.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
JsonSerde<WeatherInfo> jsonSerde = new JsonSerde<>(new JsonSerializer<>(), deserializer);
KStream<String, WeatherInfo> stream = builder.stream("extreme-weather",
Consumed.with(Serdes.String(), jsonSerde));
KStream<String, WeatherInfo> filter = stream.filter(new Predicate<String, WeatherInfo>() {
@Override
public boolean test(String key, WeatherInfo value) {
return value.getTemp() > 43 || value.getTemp() < -13;
}
});
//Perform an action on each record of KStream.
filter.peek(new ForeachAction<String, WeatherInfo>() {
@Override
public void apply(String key, WeatherInfo value) {
System.out.println("Inside filter " + value.getCity());
}
});
filter.to("extreme-weather-filter");
return stream;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment