require 'kafka'
require 'avro_turf/messaging'
kaf_broker = "from-ur-credentials"
kaf_api_key = "from-ur-credentials"
kaf_api_secret = "from-ur-credentials"
schema_url = "from-ur-credentials"
schema_api_key = "from-ur-credentials"
schema_api_secret = "from-ur-credentials"
kafka=Kafka.new(
seed_brokers: kaf_broker, sasl_plain_username: kaf_api_key, sasl_plain_password: kaf_api_secret, client_id: "daniel-application",
ssl_ca_certs_from_system: true
)
consumer = kafka.consumer(group_id: "daniel-consumer")
consumer.subscribe("ada_staging_pg_v3.my.service_items", start_from_beginning: false)
trap("TERM") { consumer.stop }
trap("INT") { consumer.stop }
msg= []
consumer.each_message do |message|
puts message.value
puts message.key
msg << message
puts "---------------"
end
# need to download the schema and store it in the local schema dir
avro = AvroTurf::Messaging.new(registry_url: schema_url, user: schema_api_key, password: schema_api_secret, schemas_path: 'tmp')
avro.dencode(msg[0].value, schema_name: 'ada_staging_pg_v3.my.service_items.Value')
Last active
December 19, 2022 13:54
-
-
Save khun84/a539c1ceeac986973a707266babb572b to your computer and use it in GitHub Desktop.
Kafka
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment