Skip to content

Instantly share code, notes, and snippets.

@litao3rd
Last active March 13, 2019 03:35
Show Gist options
  • Save litao3rd/c7d557026464fb1ff9b059a4e9b3c97c to your computer and use it in GitHub Desktop.
Save litao3rd/c7d557026464fb1ff9b059a4e9b3c97c to your computer and use it in GitHub Desktop.
gits for confluent_kafa consumer
"""
>>> import confluent_kafka
>>> confluent_kafka.version()
('0.11.6', 722432)
>>> confluent_kafka.libversion()
('1.0.0-RC7-6-g9695c0-dirty', 16777471)
"""
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'brokerserver',
'api.version.request': False,
'broker.version.fallback': "0.9.0.1", # when I donot add this parameter, I got none message.
'group.id': 'group_test',
'auto.offset.reset': 'beginning',
'session.timeout.ms': 30000,
})
c.subscribe(['topic_test'])
while True:
msg = c.poll(5.0)
recv = "%s:%d:%d: key=%s value_len=%d" % (msg.topic(), msg.partition(), msg.offset(), msg.key(), len(msg.value()))
print recv
if msg is None:
sys.stderr.write("msg is None\n")
continue
if msg.error():
sys.stderr.write("Consumer error: {}\n".format(msg.error()))
continue
#print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment