Skip to content

Instantly share code, notes, and snippets.

@khun84
Last active December 19, 2022 13:54
Show Gist options
  • Save khun84/a539c1ceeac986973a707266babb572b to your computer and use it in GitHub Desktop.
Save khun84/a539c1ceeac986973a707266babb572b to your computer and use it in GitHub Desktop.
Kafka
require 'kafka'
require 'avro_turf/messaging'

kaf_broker = "from-ur-credentials"
kaf_api_key = "from-ur-credentials"
kaf_api_secret = "from-ur-credentials"
schema_url = "from-ur-credentials"
schema_api_key = "from-ur-credentials"
schema_api_secret = "from-ur-credentials"

kafka=Kafka.new(
  seed_brokers: kaf_broker, sasl_plain_username: kaf_api_key, sasl_plain_password: kaf_api_secret, client_id: "daniel-application",
  ssl_ca_certs_from_system: true
)


consumer = kafka.consumer(group_id: "daniel-consumer")
consumer.subscribe("ada_staging_pg_v3.my.service_items", start_from_beginning: false)
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }
msg= []

consumer.each_message do |message|
  puts message.value
  puts message.key
  msg << message
  puts "---------------"
end
# need to download the schema and store it in the local schema dir
avro = AvroTurf::Messaging.new(registry_url: schema_url, user: schema_api_key, password: schema_api_secret, schemas_path: 'tmp')
avro.dencode(msg[0].value, schema_name: 'ada_staging_pg_v3.my.service_items.Value')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment