Skip to content

Instantly share code, notes, and snippets.

@abhirockzz
Last active June 6, 2018 05:10
Show Gist options
  • Save abhirockzz/4e0230121bd66c7dfeac545d9afa9297 to your computer and use it in GitHub Desktop.
Save abhirockzz/4e0230121bd66c7dfeac545d9afa9297 to your computer and use it in GitHub Desktop.
Tip: ClassCastException in Kafka Streams reducer
See - https://simplydistributed.wordpress.com/2017/02/19/tip-classcastexception-in-kafka-streams-reducer
//using String serdes (global config)
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
KStream<String,String> originalStream = builder.stream("my-topic);
KStream<String,Double> mappedStream = ... ; //value mapping operation.. data type changed to Double (was String to begin with)
KGroupedStream<String, Double> groupedByKeyStream = mappedStream.groupByKey(); //WARNING!!! default Serdes (String) will be applied
KTable<String, Double> reduced = groupedByKeyStream.reduce(//reducer function , "readings-store"); //java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double
Solution - explicit Serdes
KGroupedStream<String, Double> groupedByKeyStream = mappedStream.groupByKey(Serdes.String(), Serdes.Double());
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment