Skip to content

Instantly share code, notes, and snippets.

@hrchu
Created March 15, 2019 11:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hrchu/5deee8b2a9eafb0c2dbb9f60edf3181b to your computer and use it in GitHub Desktop.
Save hrchu/5deee8b2a9eafb0c2dbb9f60edf3181b to your computer and use it in GitHub Desktop.
confluent-python-kafka High-level Consumer understanding
from confluent_kafka import Consumer, KafkaError
c = Consumer({
'bootstrap.servers': 'mybroker',
'group.id': 'mygroup',
'auto.offset.reset': 'largest'
})
c.subscribe(['mytopic'], on_assign=print_assignment) # do not thing here
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error() is not None:
# Error or event
if msg.error().code() == KafkaError._PARTITION_EOF: # stdout 2
# End of partition event
logger.info('%% %s [%d] reached end at offset %d\n' %
(msg.topic(), msg.partition(), msg.offset()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
## 假設沒有存offset,且'auto.offset.reset': 'largest'
# offset是zero-based
# 第一次c.poll(1.0)時執行print_assignment,而不是在c.subscribe(['mytopic'])
2019-03-15 19:26:59,475 INFO Assignment: [TopicPartition{topic=TestKafkaCommander,partition=0,offset=-1001,error=None}]
# 到達partition[25],此時partition[25] is None
2019-03-15 19:26:59,580 INFO % TestKafkaCommander [0] reached end at offset 25
# producer寫一個訊息。producer的message的offset和comsumer的message的offset對不起來
2019-03-15 19:27:01,455 INFO % Message delivered to TestKafkaCommander [0] @ 31
# comsumer讀到新訊息partition[25],此時partition[25]才有值
2019-03-15 19:27:01,455 INFO % TestKafkaCommander [0] at offset 25 with key b'qq':
# comsumer再次讀值。此時會讀到partition EOF error message,其代表看到parition[26]這個空格
2019-03-15 19:27:01,555 INFO % TestKafkaCommander [0] reached end at offset 26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment