Skip to content

Instantly share code, notes, and snippets.

@mehmetcemyucel
Created December 22, 2022 16:03
Show Gist options
  • Save mehmetcemyucel/9922188290d8f463dac88490a1123471 to your computer and use it in GitHub Desktop.
Save mehmetcemyucel/9922188290d8f463dac88490a1123471 to your computer and use it in GitHub Desktop.
kafka-streams-basic
@Component
public class BasicStream {
private static final Serde<String> STRING_SERDE = Serdes.String();
private static final String INPUT_TOPIC = "basic-stream-input-topic";
private static final String OUTPUT_TOPIC = "basic-stream-output-topic";
@Autowired
void buildPipeline(StreamsBuilder streamsBuilder) {
KStream<String, String> messageStream = streamsBuilder.stream(INPUT_TOPIC, Consumed.with(STRING_SERDE, STRING_SERDE));
messageStream
.peek((key, val) -> System.out.println("1. Step key: " + key + ", val: " + val))
.mapValues(val -> val.substring(3))
.peek((key, val) -> System.out.println("2. Step key: " + key + ", val: " + val))
.filter((key, value) -> Long.parseLong(value) > 1)
.peek((key, val) -> System.out.println("3. Step key: " + key + ", val: " + val))
.to(OUTPUT_TOPIC, Produced.with(STRING_SERDE, STRING_SERDE));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment