Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Kafka PostgreSQL Consumer
#!/usr/bin/env ruby
require 'poseidon'
require 'pg'
conn = PG.connect(host: 'localhost', port: 5432, dbname: 'postgres', user: 'postgres')
begin
conn.exec("CREATE DATABASE metrics")
rescue PG::DuplicateDatabase
end
begin
conn.exec("CREATE TABLE metrics (VALUE character varying(255));")
rescue PG::DuplicateTable
end
consumer = Poseidon::PartitionConsumer.new("pg_consumer", "localhost", 9092, "metrics", 0, :earliest_offset)
loop do
messages = consumer.fetch
messages.each do |m|
conn.exec_params("INSERT INTO metrics values ('#{m.value}')")
puts "Postgres ---> Inserted metric: #{m.value}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.