Skip to content

Instantly share code, notes, and snippets.

@sdball
Last active January 23, 2020 05:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sdball/f4f95cf764ae37891baf1e8e8abd1788 to your computer and use it in GitHub Desktop.
Save sdball/f4f95cf764ae37891baf1e8e8abd1788 to your computer and use it in GitHub Desktop.
Simple generic Ruby Kafka producer/consumer for testing
require "kafka"
require "logger"
require "optparse"
script_name = File.basename($0, File.extname($0))
default_logfile = "logs/#{script_name}.log"
default_offset = "latest"
options = {}
OptionParser.new do |opts|
opts.banner = "Usage: kafka-consumer.rb [options]"
opts.on("-t TOPIC", "--topic", "Kafka topic") do |topic|
options[:topic] = topic
end
opts.on("-b BROKERS", "--brokers", "Kafka brokers, e.g. docker:9092. (optional, defaults to $KAFKA)") do |brokers|
options[:brokers] = brokers.split(",")
end
opts.on("-l LOGFILE", "--log-file", "Ruby Kafka logfile (optional, defaults to #{default_logfile})") do |logfile|
options[:logfile] = logfile
end
opts.on("--client-id CLIENT_ID", "Ruby Kafka client id (optional, defaults to #{script_name})") do |client_id|
options[:client_id] = client_id
end
opts.on("-o OFFSET", "--offset", "Use \"earliest\" to fetch all messages in the topic, \"latest\" to only fetch messages produced after this consumer starts. Defaults to \"#{default_offset}\".") do |offset|
unless ["earliest", "latest"].include? offset
raise ArgumentError, "Offset must be either \"earliest\" or \"latest\""
end
options[:offset] = offset
end
opts.on("-g GOAL", "--goal", "Set an expected number of messages to consume. (Use to time production to Kafka.)") do |goal|
options[:goal] = goal
end
end.parse!
logfile = options[:logfile] || default_logfile
logger = Logger.new(logfile)
brokers = options[:brokers] || ENV.fetch("KAFKA").split(",")
client_id = options[:client_id] || script_name
topic = options[:topic]
offset = (options[:offset] || default_offset).to_sym
goal = options[:goal].to_i
kafka = Kafka.new(
seed_brokers: brokers,
client_id: client_id,
socket_timeout: 20,
logger: logger,
)
begin
partition = 0
consumed = 0
start_time = nil
loop do
messages = kafka.fetch_messages(
topic: topic,
partition: partition,
offset: offset,
)
messages.each do |message|
start_time ||= Time.now.to_i
offset = message.offset + 1
consumed += 1
puts "#{offset}: #{message.value} [Cumulative Runtime: #{Time.now.to_i - start_time} seconds] [#{consumed} messages so far]"
end
break if (goal > 0 && consumed >= goal)
end
rescue Interrupt
puts
ensure
puts "Consumed #{consumed} messages."
puts "Ran for #{Time.now.to_i - start_time} seconds." if start_time
kafka.close
end
require "kafka"
require "logger"
require "optparse"
require "snappy"
script_name = File.basename($0, File.extname($0))
default_logfile = "logs/#{script_name}.log"
options = {}
OptionParser.new do |opts|
opts.banner = "Usage: kafka-producer.rb [options]"
opts.on("-t TOPIC", "--topic", "Kafka topic") do |topic|
options[:topic] = topic
end
opts.on("-b BROKERS", "--brokers", "Kafka brokers, e.g. docker:9092. (optional, defaults to $KAFKA)") do |brokers|
options[:brokers] = brokers.split(",")
end
opts.on("-l LOGFILE", "--log-file", "Ruby Kafka logfile (optional, defaults to #{default_logfile})") do |logfile|
options[:logfile] = logfile
end
opts.on("-c COMPRESSION", "--compression", "Compression codec to use. \"gzip\" or \"snappy\"") do |compression|
options[:compression] = compression.to_sym
end
opts.on("--client-id CLIENT_ID", "Ruby Kafka client id (optional, defaults to #{script_name})") do |client_id|
options[:client_id] = client_id
end
end.parse!
logfile = options[:logfile] || default_logfile
logger = Logger.new(logfile)
brokers = options[:brokers] || ENV.fetch("KAFKA").split(",")
client_id = options[:client_id] || script_name
topic = options[:topic]
kafka = Kafka.new(
seed_brokers: brokers,
client_id: client_id,
logger: logger,
)
producer = kafka.producer(compression_codec: options[:compression])
produced = 0
begin
$stdin.each do |line|
produced += 1
producer.produce(line, topic: topic)
producer.deliver_messages
end
rescue Interrupt
puts
ensure
puts "Produced #{produced} messages."
producer.deliver_messages
producer.shutdown
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment