Skip to content

Instantly share code, notes, and snippets.

@victorarias
Last active March 21, 2018 20:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save victorarias/442b86640fa89e2dec0549af9302e19d to your computer and use it in GitHub Desktop.
Save victorarias/442b86640fa89e2dec0549af9302e19d to your computer and use it in GitHub Desktop.
Ruby + Java via Kafka integration
public static void main(String[] args) {
KafkaConsumer consumer = new KafkaConsumer(getProperties());
consumer.subscribe(Collections.singletonList("bids"));
System.out.println("Starting polling from Java bids consumer:");
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
String message = String.format("key: %s, payload: %s", record.key(), record.value());
System.out.println(message);
}
consumer.commitSync();
}
}
private static Properties getProperties() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "HelloConsumer");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
Starting polling from Java bids consumer:
key: 4, payload: {"bid_amount":4,"producer_id":"producer_1"}
key: 9, payload: {"bid_amount":9,"producer_id":"producer_2"}
key: 6, payload: {"bid_amount":12,"producer_id":"producer_1"}
key: 9, payload: {"bid_amount":18,"producer_id":"producer_2"}
key: 5, payload: {"bid_amount":15,"producer_id":"producer_2"}
key: 8, payload: {"bid_amount":24,"producer_id":"producer_1"}
key: 2, payload: {"bid_amount":8,"producer_id":"producer_1"}
key: 6, payload: {"bid_amount":24,"producer_id":"producer_2"}
key: 2, payload: {"bid_amount":10,"producer_id":"producer_1"}
key: 7, payload: {"bid_amount":35,"producer_id":"producer_2"}
key: 9, payload: {"bid_amount":54,"producer_id":"producer_1"}
key: 3, payload: {"bid_amount":18,"producer_id":"producer_2"}
key: 2, payload: {"bid_amount":14,"producer_id":"producer_2"}
key: 4, payload: {"bid_amount":28,"producer_id":"producer_1"}
require "kafka"
require "json"
class Producer
def initialize(producer_id)
@producer_id = producer_id
@kafka = Kafka.new(["localhost:9092"], client_id: @producer_id)
@count = 0
end
def run
random = Random.new
puts "#{@producer_id}: Starting to publish messages..."
loop do
@count += 1
item_id = random.rand(10) + 1
message = {
bid_amount: item_id * @count,
producer_id: @producer_id
}
log_and_deliver_message(key: item_id, message: message)
sleep 1
end
end
private
def log_and_deliver_message(key:, message:)
message = JSON.dump(message)
key = key.to_s
puts "#{@producer_id}: Publishing message with key=#{key} message=#{message}"
@kafka.deliver_message(message, key: key, topic: "bids")
end
end
1.upto(2).map { |id|
Thread.new do
Producer.new("producer_#{id}").run
end
}
gets
19:54:19 kafka_rb$ be ruby producer.rb
producer_2: Starting to publish messages...
producer_2: Publishing message with key=9 message={"bid_amount":9,"producer_id":"producer_2"}
producer_1: Starting to publish messages...
producer_1: Publishing message with key=4 message={"bid_amount":4,"producer_id":"producer_1"}
producer_1: Publishing message with key=6 message={"bid_amount":12,"producer_id":"producer_1"}
producer_2: Publishing message with key=9 message={"bid_amount":18,"producer_id":"producer_2"}
producer_2: Publishing message with key=5 message={"bid_amount":15,"producer_id":"producer_2"}
producer_1: Publishing message with key=8 message={"bid_amount":24,"producer_id":"producer_1"}
producer_1: Publishing message with key=2 message={"bid_amount":8,"producer_id":"producer_1"}
producer_2: Publishing message with key=6 message={"bid_amount":24,"producer_id":"producer_2"}
producer_1: Publishing message with key=2 message={"bid_amount":10,"producer_id":"producer_1"}
producer_2: Publishing message with key=7 message={"bid_amount":35,"producer_id":"producer_2"}
producer_1: Publishing message with key=9 message={"bid_amount":54,"producer_id":"producer_1"}
producer_2: Publishing message with key=3 message={"bid_amount":18,"producer_id":"producer_2"}
producer_2: Publishing message with key=2 message={"bid_amount":14,"producer_id":"producer_2"}
producer_1: Publishing message with key=4 message={"bid_amount":28,"producer_id":"producer_1"}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment