Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Kafka Mongo Consumer
#!/usr/bin/env ruby
require 'poseidon'
require 'mongo'
Mongo::Logger.logger.level = Logger::WARN
db = Mongo::Client.new(['127.0.0.1:27017'], database: 'metrics')
consumer = Poseidon::PartitionConsumer.new("mongo_consumer", "localhost", 9092, "metrics", 0, :earliest_offset)
loop do
messages = consumer.fetch
messages.each do |m|
result = db[:metrics].insert_one({ metric: m.value })
if result.n == 1
puts "MONGO --> Inserted metric: #{m.value}"
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.