Created
February 18, 2016 21:29
-
-
Save aviflax/43e206015f208129aef4 to your computer and use it in GitHub Desktop.
JRuby Kafka Consumer Vendor using Confluent’s KafkaAvroDeserializer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative './config' | |
java_import org.apache.kafka.clients.consumer.KafkaConsumer | |
java_import org.apache.kafka.clients.consumer.ConsumerConfig | |
# This class name must be in a string, apparently because it starts with io | |
java_import 'io.confluent.kafka.serializers.KafkaAvroDeserializer' | |
# We need to reuse consumers between requests because they maintain a buffer | |
# and a pool of TCP/IP connections to the Kafka nodes. This also provides a | |
# good way to stub/mock the consumers for tests (the tests just stub `get` to | |
# return a mock). | |
module Consumers | |
@consumers = {} | |
def self.get(id) | |
@consumers.fetch(id) { @consumers[id] = make(id) } | |
end | |
def self.make(id) | |
config = java.util.Properties.new | |
config.putAll( | |
ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG => | |
AppConfig[:kafka_host] + ':' + AppConfig[:kafka_port], | |
ConsumerConfig::CLIENT_ID_CONFIG => id, | |
ConsumerConfig::GROUP_ID_CONFIG => id, | |
ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG => 'false', | |
ConsumerConfig::SESSION_TIMEOUT_MS_CONFIG => '30000', | |
ConsumerConfig::KEY_DESERIALIZER_CLASS_CONFIG => | |
KafkaAvroDeserializer.java_class, | |
ConsumerConfig::VALUE_DESERIALIZER_CLASS_CONFIG => | |
KafkaAvroDeserializer.java_class, | |
'schema.registry.url' => | |
"http://#{AppConfig[:registry_host]}:#{AppConfig[:registry_port]}" | |
) | |
KafkaConsumer.new config | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment