Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!./.venv/bin/python3
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from pprint import pformat
config = {
'bootstrap.servers': 'kafka-test-01.uit.tufts.edu:9092',
'queue.buffering.max.ms': 1,
'message.send.max.retries': 5,
'retry.backoff.ms': 200,
'message.max.bytes': 104857600,
}
config['security.protocol'] = 'SASL_SSL'
config['sasl.mechanisms'] = 'PLAIN'
config['sasl.username'] = 'admin'
config['sasl.password'] = 'XXXXXXXXX'
config['group.id'] = 1
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
topics = ["drio-test"]
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
c = Consumer(config, logger=logger)
c.subscribe(topics, on_assign=print_assignment)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
# Close down consumer to commit final offsets.
c.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment