Skip to content

Instantly share code, notes, and snippets.

@jsnouffer
Last active August 13, 2020 03:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jsnouffer/29fd0748ec1629d3f94074cba4a406e7 to your computer and use it in GitHub Desktop.
Save jsnouffer/29fd0748ec1629d3f94074cba4a406e7 to your computer and use it in GitHub Desktop.
import java.util.Properties;
import com.google.gson.JsonObject;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
// ... //
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, LOCATION_OF_KAFKA_BROKERS);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "MyStreamingApp");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, JsonObject> inputStream = builder.stream(inputTopic);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment