Skip to content

Instantly share code, notes, and snippets.

@mcSw4p
Created August 13, 2017 21:48
Show Gist options
  • Save mcSw4p/1c7baffb9889b7cc356dac9caafaba08 to your computer and use it in GitHub Desktop.
Save mcSw4p/1c7baffb9889b7cc356dac9caafaba08 to your computer and use it in GitHub Desktop.
Kafka 0.11.0.0 examples.
package kafka.streams;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
/**
* Copyright (C) 2017 Sw4p
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @author Sw4p
*
*/
public class ExactlyOnceStream {
// Disclaimer: I just started with Kafka, so i'm going to miss things. This is just what worked for me. I noticed there are not many deep tutorials on the
// new exactly-once semantics in kafka and I had to dig deep to find what I have here for work. I don't want anyone else to have the same headache. -Sw4p
// Run from the start of the program. You can start it from where-ever. Typically each stream is handled in its own thread,
// normally part of a thread pool or similar type of pooling.
public static void main(String[] args){
// Before you start I would always suggest reading a bit on the topic. That way you are not completely confused when things go bad.
// Here is a few links to some help information on this topic:
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics
// https://kafka.apache.org/0110/documentation/streams/quickstart
// Get the properties of the Kafka Stream. This is where you can set the individual properties for the stream. How many bytes to accept... so on.
Properties streamsProperties = getProperties();
// String serializers and deserializers for the Kafka records.
Serde<String> stringSerde = Serdes.String();
// Kafka stream builder. What builds the stream we are creating
KStreamBuilder builder = new KStreamBuilder();
// Get all the records from the defined topic that haven't been consumed or committed.
KStream<String, String> records = builder.stream(stringSerde, stringSerde, "streams-test-topic");
// Do something with records
// For this example we will forward each record to an endpoint. Using a lambda expression
records.foreach((key, value) -> forwardRecord(value));
// Kafka Steam its self
KafkaStreams stream = new KafkaStreams(builder, streamsProperties);
// Clean up the stream before we start it.
stream.cleanUp();
// Start the stream.
stream.start();
// Do other things
// Normally you would put this in a loop and close the stream when the loop ends. But for example purposes...
try {
Thread.sleep(5000);
stream.close();
}catch(InterruptedException e) {}
}
private static Properties getProperties(){
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-example");
//props.put("application.id", "kafka-streams-example");
props.put(StreamsConfig.CLIENT_ID_CONFIG, "kafka-streams-example-client");
//props.put("client.id", "kafka-streams-example-client");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "0.0.0.0:9092");
//props.put("bootstrap.servers", "0.0.0.0:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//props.put("default.key.serde", Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
//props.put("default.value.serde", Serdes.String().getClass().getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
//props.put("commit.interval.ms", 100);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
//props.put("cache.max.bytes.buffering", 0);
// The magic. Set the stream to use exactly-once semantics rather than at-least-once..
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
//props.put("processing.guarantee", "exactly_once");
return props;
}
private static void forwardRecord(String record){
// Code to forward the message to the endpoint. This is not the point of this example...
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment