Skip to content

Instantly share code, notes, and snippets.

Created August 13, 2017 21:48
Show Gist options
  • Save mcSw4p/1c7baffb9889b7cc356dac9caafaba08 to your computer and use it in GitHub Desktop.
Save mcSw4p/1c7baffb9889b7cc356dac9caafaba08 to your computer and use it in GitHub Desktop.
Kafka examples.
package kafka.streams;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
* Copyright (C) 2017 Sw4p
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* GNU General Public License for more details.
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <>.
* @author Sw4p
public class ExactlyOnceStream {
// Disclaimer: I just started with Kafka, so i'm going to miss things. This is just what worked for me. I noticed there are not many deep tutorials on the
// new exactly-once semantics in kafka and I had to dig deep to find what I have here for work. I don't want anyone else to have the same headache. -Sw4p
// Run from the start of the program. You can start it from where-ever. Typically each stream is handled in its own thread,
// normally part of a thread pool or similar type of pooling.
public static void main(String[] args){
// Before you start I would always suggest reading a bit on the topic. That way you are not completely confused when things go bad.
// Here is a few links to some help information on this topic:
// Get the properties of the Kafka Stream. This is where you can set the individual properties for the stream. How many bytes to accept... so on.
Properties streamsProperties = getProperties();
// String serializers and deserializers for the Kafka records.
Serde<String> stringSerde = Serdes.String();
// Kafka stream builder. What builds the stream we are creating
KStreamBuilder builder = new KStreamBuilder();
// Get all the records from the defined topic that haven't been consumed or committed.
KStream<String, String> records =, stringSerde, "streams-test-topic");
// Do something with records
// For this example we will forward each record to an endpoint. Using a lambda expression
records.foreach((key, value) -> forwardRecord(value));
// Kafka Steam its self
KafkaStreams stream = new KafkaStreams(builder, streamsProperties);
// Clean up the stream before we start it.
// Start the stream.
// Do other things
// Normally you would put this in a loop and close the stream when the loop ends. But for example purposes...
try {
}catch(InterruptedException e) {}
private static Properties getProperties(){
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-example");
//props.put("", "kafka-streams-example");
props.put(StreamsConfig.CLIENT_ID_CONFIG, "kafka-streams-example-client");
//props.put("", "kafka-streams-example-client");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "");
//props.put("bootstrap.servers", "");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//props.put("default.key.serde", Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//props.put("default.value.serde", Serdes.String().getClass().getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
//props.put("", 100);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
//props.put("cache.max.bytes.buffering", 0);
// The magic. Set the stream to use exactly-once semantics rather than at-least-once..
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
//props.put("processing.guarantee", "exactly_once");
return props;
private static void forwardRecord(String record){
// Code to forward the message to the endpoint. This is not the point of this example...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment