Last active
June 6, 2018 05:10
-
-
Save abhirockzz/4e0230121bd66c7dfeac545d9afa9297 to your computer and use it in GitHub Desktop.
Tip: ClassCastException in Kafka Streams reducer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
See - https://simplydistributed.wordpress.com/2017/02/19/tip-classcastexception-in-kafka-streams-reducer | |
//using String serdes (global config) | |
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
KStream<String,String> originalStream = builder.stream("my-topic); | |
KStream<String,Double> mappedStream = ... ; //value mapping operation.. data type changed to Double (was String to begin with) | |
KGroupedStream<String, Double> groupedByKeyStream = mappedStream.groupByKey(); //WARNING!!! default Serdes (String) will be applied | |
KTable<String, Double> reduced = groupedByKeyStream.reduce(//reducer function , "readings-store"); //java.lang.ClassCastException: java.lang.String cannot be cast to java.lang.Double | |
Solution - explicit Serdes | |
KGroupedStream<String, Double> groupedByKeyStream = mappedStream.groupByKey(Serdes.String(), Serdes.Double()); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment