Skip to content

Instantly share code, notes, and snippets.

@aviflax
Created February 18, 2016 21:29
Show Gist options
  • Save aviflax/43e206015f208129aef4 to your computer and use it in GitHub Desktop.
Save aviflax/43e206015f208129aef4 to your computer and use it in GitHub Desktop.
JRuby Kafka Consumer Vendor using Confluent’s KafkaAvroDeserializer
require_relative './config'
java_import org.apache.kafka.clients.consumer.KafkaConsumer
java_import org.apache.kafka.clients.consumer.ConsumerConfig
# This class name must be in a string, apparently because it starts with io
java_import 'io.confluent.kafka.serializers.KafkaAvroDeserializer'
# We need to reuse consumers between requests because they maintain a buffer
# and a pool of TCP/IP connections to the Kafka nodes. This also provides a
# good way to stub/mock the consumers for tests (the tests just stub `get` to
# return a mock).
module Consumers
@consumers = {}
def self.get(id)
@consumers.fetch(id) { @consumers[id] = make(id) }
end
def self.make(id)
config = java.util.Properties.new
config.putAll(
ConsumerConfig::BOOTSTRAP_SERVERS_CONFIG =>
AppConfig[:kafka_host] + ':' + AppConfig[:kafka_port],
ConsumerConfig::CLIENT_ID_CONFIG => id,
ConsumerConfig::GROUP_ID_CONFIG => id,
ConsumerConfig::ENABLE_AUTO_COMMIT_CONFIG => 'false',
ConsumerConfig::SESSION_TIMEOUT_MS_CONFIG => '30000',
ConsumerConfig::KEY_DESERIALIZER_CLASS_CONFIG =>
KafkaAvroDeserializer.java_class,
ConsumerConfig::VALUE_DESERIALIZER_CLASS_CONFIG =>
KafkaAvroDeserializer.java_class,
'schema.registry.url' =>
"http://#{AppConfig[:registry_host]}:#{AppConfig[:registry_port]}"
)
KafkaConsumer.new config
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment