Skip to content

Instantly share code, notes, and snippets.

@william8th
Last active March 15, 2019 15:12
Show Gist options
  • Save william8th/2b8a14d6828e37948a9951eefae43d17 to your computer and use it in GitHub Desktop.
Save william8th/2b8a14d6828e37948a9951eefae43d17 to your computer and use it in GitHub Desktop.
Kafka Avro Consumer Python
from confluent_kafka.avro import AvroConsumer
from confluent_kafka.avro.serializer import SerializerError
c = AvroConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'groupid',
'schema.registry.url': 'http://127.0.0.1:8081'
})
c.subscribe(['reddit_posts'])
while True:
try:
msg = c.poll(10)
except SerializerError as e:
print("Message deserialization failed for {}: {}".format(msg, e))
break
if msg is None:
continue
if msg.error():
print("AvroConsumer error: {}".format(msg.error()))
continue
value = msg.value()
print(value["author"])
c.close()
avro-python3==1.8.2
certifi==2019.3.9
chardet==3.0.4
confluent-kafka==0.11.6
idna==2.8
requests==2.21.0
urllib3==1.24.1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment