Skip to content

Instantly share code, notes, and snippets.

@drio
Created February 17, 2022 21:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save drio/b5262a864df722ade9ba311fca0e9250 to your computer and use it in GitHub Desktop.
Save drio/b5262a864df722ade9ba311fca0e9250 to your computer and use it in GitHub Desktop.
#!./.venv/bin/python3
from confluent_kafka import Consumer, KafkaException
import sys
import getopt
import json
import logging
from pprint import pformat
config = {
'bootstrap.servers': 'kafka-test-01.uit.tufts.edu:9092',
'queue.buffering.max.ms': 1,
'message.send.max.retries': 5,
'retry.backoff.ms': 200,
'message.max.bytes': 104857600,
}
config['security.protocol'] = 'SASL_SSL'
config['sasl.mechanisms'] = 'PLAIN'
config['sasl.username'] = 'admin'
config['sasl.password'] = 'XXXXXXXXX'
config['group.id'] = 1
def print_assignment(consumer, partitions):
print('Assignment:', partitions)
topics = ["drio-test"]
logger = logging.getLogger('consumer')
logger.setLevel(logging.DEBUG)
c = Consumer(config, logger=logger)
c.subscribe(topics, on_assign=print_assignment)
# Read messages from Kafka, print to stdout
try:
while True:
msg = c.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
# Proper message
sys.stderr.write('%% %s [%d] at offset %d with key %s:\n' %
(msg.topic(), msg.partition(), msg.offset(),
str(msg.key())))
print(msg.value())
except KeyboardInterrupt:
sys.stderr.write('%% Aborted by user\n')
finally:
# Close down consumer to commit final offsets.
c.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment