Last active
March 9, 2021 15:28
-
-
Save sheelprabhakar/15abe891f5be7c583107af8ff7b7c964 to your computer and use it in GitHub Desktop.
Kafka springboot stream config
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Configuration | |
@EnableKafka | |
@EnableKafkaStreams | |
public class KafkaStreamConfig { | |
@Value("${kafka.bootstrapAddress}") | |
private String bootstrapAddress; | |
@Value("${kafka.groupId}") | |
private String groupId; | |
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME) | |
public KafkaStreamsConfiguration streamsConfig() { | |
Map<String, Object> props = new HashMap<>(); | |
props.put( | |
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, | |
bootstrapAddress); | |
props.put( | |
ConsumerConfig.GROUP_ID_CONFIG, | |
groupId); | |
props.put( | |
StreamsConfig.APPLICATION_ID_CONFIG, | |
"stream-1-weather"); | |
JsonSerde<WeatherInfo> jsonSerde = new JsonSerde<>(WeatherInfo.class); | |
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass()); | |
return new KafkaStreamsConfiguration(props); | |
} | |
@Bean | |
public KStream<String, WeatherInfo> kStreamJson(StreamsBuilder builder) { | |
JsonDeserializer<WeatherInfo> deserializer = new JsonDeserializer<>(WeatherInfo.class); | |
deserializer.setRemoveTypeHeaders(false); | |
deserializer.addTrustedPackages("*"); | |
deserializer.setUseTypeMapperForKey(true); | |
JsonSerde<WeatherInfo> jsonSerde = new JsonSerde<>(new JsonSerializer<>(), deserializer); | |
KStream<String, WeatherInfo> stream = builder.stream("extreme-weather", | |
Consumed.with(Serdes.String(), jsonSerde)); | |
KStream<String, WeatherInfo> filter = stream.filter(new Predicate<String, WeatherInfo>() { | |
@Override | |
public boolean test(String key, WeatherInfo value) { | |
return value.getTemp() > 43 || value.getTemp() < -13; | |
} | |
}); | |
//Perform an action on each record of KStream. | |
filter.peek(new ForeachAction<String, WeatherInfo>() { | |
@Override | |
public void apply(String key, WeatherInfo value) { | |
System.out.println("Inside filter " + value.getCity()); | |
} | |
}); | |
filter.to("extreme-weather-filter"); | |
return stream; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment