Skip to content

Instantly share code, notes, and snippets.

@highsmallxu
Last active February 21, 2021 21:35
Show Gist options
  • Save highsmallxu/cccb9c70389a1160864f53098b34ec66 to your computer and use it in GitHub Desktop.
Save highsmallxu/cccb9c70389a1160864f53098b34ec66 to your computer and use it in GitHub Desktop.
from confluent_kafka import Consumer, TopicPartition
size = 1000000
consumer = Consumer(
{
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest',
}
)
def consume_session_window(consumer, timeout=1, session_max=5):
session = 0
while True:
message = consumer.poll(timeout)
if message is None:
session += 1
if session > session_max:
break
continue
if message.error():
print("Consumer error: {}".format(msg.error()))
continue
yield message
consumer.close()
def consume(consumer, timeout):
while True:
message = consumer.poll(timeout)
if message is None:
continue
if message.error():
print("Consumer error: {}".format(msg.error()))
continue
yield message
consumer.close()
def confluent_consumer():
consumer.subscribe(['topic1'])
for msg in consume(consumer, 1.0):
print(msg)
def confluent_consumer_partition():
consumer.assign([TopicPartition("topic1", 0)])
for msg in consume(consumer, 1.0):
print(msg)
@HarisMichailidis
Copy link

Hi there,
Thanks a lot for providing these examples!

A typo I found:
on L22, L33 should be message.error() instead of msg.error()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment