Skip to content

Instantly share code, notes, and snippets.

@machty
Created February 13, 2017 19:58
Show Gist options
  • Save machty/0b7d14752142d68be434c8d285ae93f1 to your computer and use it in GitHub Desktop.
Save machty/0b7d14752142d68be434c8d285ae93f1 to your computer and use it in GitHub Desktop.
$CLASSPATH << '/Users/machty/code/exc/confluent-3.1.2/share'
require 'rubygems'
require 'bundler'
Bundler.setup
Dir.glob("/Users/machty/code/exc/confluent-3.1.2/share/java/kafka/**/*.jar").each do |jar|
require jar
end
#java_import 'java.util.Properties'
java_import 'org.apache.kafka.clients.consumer.ConsumerConfig'
java_import 'org.apache.kafka.common.serialization.Serdes'
java_import 'org.apache.kafka.streams.KafkaStreams'
java_import 'org.apache.kafka.streams.KeyValue'
java_import 'org.apache.kafka.streams.StreamsConfig'
java_import 'org.apache.kafka.streams.kstream.KStreamBuilder'
java_import 'org.apache.kafka.streams.kstream.KStream'
java_import 'org.apache.kafka.streams.kstream.KTable'
java_import 'org.apache.kafka.streams.kstream.KeyValueMapper'
java_import 'org.apache.kafka.streams.kstream.ValueMapper'
java_import 'java.util.Arrays'
java_import 'java.util.Locale'
java_import 'java.util.Properties'
props = Properties.new
props.put(StreamsConfig::APPLICATION_ID_CONFIG, "streams-wordcount")
props.put(StreamsConfig::BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(StreamsConfig::ZOOKEEPER_CONNECT_CONFIG, "localhost:2181")
props.put(StreamsConfig::KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.put(StreamsConfig::VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName())
props.put(ConsumerConfig::AUTO_OFFSET_RESET_CONFIG, "earliest")
builder = KStreamBuilder.new
source = builder.stream("streams-file-input")
counts = source
.flatMapValues(-> (value) {
ruby_array = value.downcase.split(" ").to_java
Arrays.asList(ruby_array)
}).map(-> (key, value) {
KeyValue.new(value, value)
})
.groupByKey()
.count("Counts")
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output")
streams = KafkaStreams.new(builder, props)
streams.start()
sleep 5
streams.close()
puts "done"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment