Skip to content

Instantly share code, notes, and snippets.

@ricjcosme
Created June 11, 2017 17:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ricjcosme/b609dc22bb653645462398133176ed02 to your computer and use it in GitHub Desktop.
Save ricjcosme/b609dc22bb653645462398133176ed02 to your computer and use it in GitHub Desktop.
import confluent_kafka
#import json
#import msgpack
c = confluent_kafka.Consumer({'bootstrap.servers': 'kafka', 'group.id': 'PyCharm_consumer',
'default.topic.config': {'auto.offset.reset': 'latest'}})
c.subscribe(['ping'])
running = True
i = 0
while running:
msg = c.poll()
if not msg.error():
i = i + 1
j = msg.value()
#j = json.loads(msgpack.unpackb(msg.value()))
#j = json.loads(msg.value())
print str(i) + ": " + str(j)
elif msg.error().code() != confluent_kafka.KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment