Skip to content

Instantly share code, notes, and snippets.

@blurredbits
Last active May 17, 2018 16:50
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save blurredbits/cdda514226f29ecbbb1d to your computer and use it in GitHub Desktop.
Save blurredbits/cdda514226f29ecbbb1d to your computer and use it in GitHub Desktop.
Kafka PostgreSQL Consumer
#!/usr/bin/env ruby
require 'poseidon'
require 'pg'
conn = PG.connect(host: 'localhost', port: 5432, dbname: 'postgres', user: 'postgres')
begin
conn.exec("CREATE DATABASE metrics")
rescue PG::DuplicateDatabase
end
begin
conn.exec("CREATE TABLE metrics (VALUE character varying(255));")
rescue PG::DuplicateTable
end
consumer = Poseidon::PartitionConsumer.new("pg_consumer", "localhost", 9092, "metrics", 0, :earliest_offset)
loop do
messages = consumer.fetch
messages.each do |m|
conn.exec_params("INSERT INTO metrics values ('#{m.value}')")
puts "Postgres ---> Inserted metric: #{m.value}"
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment