Skip to content

Instantly share code, notes, and snippets.

@ylansegal
Last active April 2, 2020 23:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ylansegal/b047ff27026afc9f0a4fc22fb7c64a58 to your computer and use it in GitHub Desktop.
Save ylansegal/b047ff27026afc9f0a4fc22fb7c64a58 to your computer and use it in GitHub Desktop.
Kafka POC
require "bundler/inline"
require 'yaml'
# require 'pry'
require "./person"
gemfile do
source "https://rubygems.org"
gem 'ruby-kafka'
end
puts "Connecting to kafka"
kafka = Kafka.new(["localhost:9092"], client_id: "ruby-consumer")
first_name_counter = Hash.new(0)
kafka.each_message(topic: "people_v2") do |message|
puts "Consuming #{message.offset}"
person = YAML.load(message.value)
first_name_counter[person.first_name] += 1
puts first_name_counter if message.offset % 10 == 0
end
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_CREATE_TOPICS: 'people'
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
require "bundler/inline"
require 'json'
gemfile do
source "https://rubygems.org"
gem 'ruby-kafka'
gem 'activesupport'
end
require 'active_support/core_ext/numeric/time'
puts "Connecting to kafka"
kafka = Kafka.new(["localhost:9092"], client_id: "ruby-producer")
STATUS = ['Draft', 'Out For Bid', 'Out For Signature', 'Approved', 'Complete', 'Terminated']
while true do
sleep rand(3)
id = rand(100)
event = {
event: STATUS.sample,
package: {
id: id,
type: 'PrimeContract',
title: "#{rand(1000)} #{%w[Main Elm High].sample} St"
},
line_items: (rand(9) + 1).times.map { |i|
{
id: id * 1000 + i,
wbs_code_id: rand(100),
amount: rand(5000) + (rand(100).to_f / 100),
currency: "USD",
accounting_date: rand(10).days.ago.to_date,
units: %w[Each sqft m2 yards].sample
}
}
}
puts "Publishing event with package.id: #{event[:package][:id]}"
kafka.deliver_message(event.to_json, key: event[:package][:id].to_s, topic: 'ffs')
end
Person = Struct.new(:id, :first_name, :last_name)
require "bundler/inline"
require 'json'
gemfile do
source "https://rubygems.org"
gem 'ruby-kafka'
end
puts "Connecting to kafka"
kafka = Kafka.new(["localhost:9092"], client_id: "ruby-consumer")
kafka.each_message(topic: "ffs") do |message|
puts "Consuming #{message.offset}"
puts JSON.parse(message.value)
end
require "bundler/inline"
require "./person"
gemfile do
source "https://rubygems.org"
gem 'ruby-kafka'
gem 'faker'
end
puts "Connecting to kafka"
kafka = Kafka.new(["localhost:9092"], client_id: "ruby-producer")
while true do
sleep rand(5)
person = Person.new(SecureRandom.uuid, Faker::Name.first_name, Faker::Name.last_name)
puts "Producing #{person}"
kafka.deliver_message(person.to_yaml, key: person.id, topic: "people_v2")
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment