Skip to content

Instantly share code, notes, and snippets.

@hrchu
Created July 19, 2018 11:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hrchu/7a9a60082c1be56e7af8fc313e7a03da to your computer and use it in GitHub Desktop.
Save hrchu/7a9a60082c1be56e7af8fc313e7a03da to your computer and use it in GitHub Desktop.
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'mybroker',
'group.id': 'mygroup',
'default.topic.config': {
'auto.offset.reset': 'smallest'
}
})
c.subscribe(['mytopic'])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print(msg.error())
break
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment