Skip to content

Instantly share code, notes, and snippets.

@matzew
Last active August 14, 2017 13:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matzew/bbebb4f78523c5e0b2c098c9830482c4 to your computer and use it in GitHub Desktop.
Save matzew/bbebb4f78523c5e0b2c098c9830482c4 to your computer and use it in GitHub Desktop.
package net.wessendorf;
import net.wessendorf.kafka.cdi.annotation.KafkaStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
public class StreamProcessor {
@KafkaStream(input = "input_topic", output = "output_topic")
public KStream transformer(final KStream<String, String> source) {
return ource.filter((key, value) -> value.equals("Success"))
.groupByKey()
.count("successMessagesStore")
toStream();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment