Skip to content

Instantly share code, notes, and snippets.

@victorarias
Created March 21, 2018 20:29
Show Gist options
  • Save victorarias/4faf8a382742a92a28e18c13e39b6c5c to your computer and use it in GitHub Desktop.
Save victorarias/4faf8a382742a92a28e18c13e39b6c5c to your computer and use it in GitHub Desktop.
require "kafka"
require "json"
class Producer
def initialize(producer_id)
@producer_id = producer_id
@kafka = Kafka.new(["localhost:9092"], client_id: @producer_id)
@count = 0
end
def run
random = Random.new
puts "#{@producer_id}: Starting to publish messages..."
loop do
@count += 1
item_id = random.rand(10) + 1
message = {
bid_amount: item_id * @count,
producer_id: @producer_id
}
log_and_deliver_message(key: item_id, message: message)
sleep 1
end
end
private
def log_and_deliver_message(key:, message:)
message = JSON.dump(message)
key = key.to_s
puts "#{@producer_id}: Publishing message with key=#{key} message=#{message}"
@kafka.deliver_message(message, key: key, topic: "bids")
end
end
1.upto(2).map { |id|
Thread.new do
Producer.new("producer_#{id}").run
end
}
gets
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment