Skip to content

Instantly share code, notes, and snippets.

@drio
Created February 17, 2022 21:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drio/afa0d4e13c872f28ec2a9f4d5ab6d97a to your computer and use it in GitHub Desktop.
Save drio/afa0d4e13c872f28ec2a9f4d5ab6d97a to your computer and use it in GitHub Desktop.
from confluent_kafka import Consumer, KafkaError, OFFSET_BEGINNING
class SimpleConsumer:
def __init__(self, brokers: str, topic: str, consumer_group: str):
self.topic = topic
self.config = {
'bootstrap.servers': brokers,
'group.id': consumer_group,
'enable.auto.offset.store': True,
'max.partition.fetch.bytes': 104857600,
'default.topic.config': {'auto.commit.interval.ms': 2000,
'auto.offset.reset': 'smallest',
'enable.auto.commit': True
}
}
def run(self, consume_from_beginning: bool = False):
def _assign(_consumer, partitions):
for p in partitions:
p.offset = OFFSET_BEGINNING
_consumer.assign(partitions)
consumer = Consumer(self.config)
consumer.subscribe([self.topic])
try:
if consume_from_beginning:
consumer.assign([])
consumer.subscribe([self.topic], on_assign=_assign)
else:
consumer.subscribe([self.topic])
# Get messages from kafka
while True:
msg = consumer.poll(timeout=1.0)
if msg:
kafka_error = msg.error()
if kafka_error is not None:
if kafka_error.code() == KafkaError._PARTITION_EOF: # when there are no more msgs on the queue
continue
else:
print(f'Error message received from the kafka topic {self.topic}')
print(kafka_error)
else:
# Call into the consumer to handle the message body
yield msg
finally:
if consumer is not None:
consumer.close()
if __name__ == '__main__':
consumer = SimpleConsumer('localhost:29092', 'jon_test_topic', 'some_group')
for msg in consumer.run():
print(msg.value())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment