Skip to content

Instantly share code, notes, and snippets.

@ivanursul
Created April 10, 2017 08:36
Show Gist options
  • Save ivanursul/87949fd35b67c6a7b22ceef7af72ea1c to your computer and use it in GitHub Desktop.
Save ivanursul/87949fd35b67c6a7b22ceef7af72ea1c to your computer and use it in GitHub Desktop.
package org.kafka.examples;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStreamBuilder builder = new KStreamBuilder();
GlobalKTable<String, Long> globalKTable2 = builder.globalTable(
Serdes.String(), Serdes.Long(), "WordsWithCountsTopic2", "WordCounts"
);
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment