Skip to content

Instantly share code, notes, and snippets.

@markhu
Created February 10, 2021 21:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markhu/8604d9b5e8bd916a9d959409cf45ddba to your computer and use it in GitHub Desktop.
Save markhu/8604d9b5e8bd916a9d959409cf45ddba to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# kafka-listen.py # optional args: -a for all, or regex pattern of topics
import datetime, json, os, sys
import kafka # pip install kafka-python
KAFKA_SERVER = "kafka-esque.servicebus.windows.net:9093"
KAFKA_TOPIC = "test-topic"
KAFKA_PASSWORD = os.environ.get("KAFKA_PASSWORD","password required, as in jaas.conf")
PATTERN = None
consumer = kafka.KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=[KAFKA_SERVER],
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id='$Default',
sasl_mechanism='PLAIN',
sasl_plain_password=KAFKA_PASSWORD,
sasl_plain_username='$ConnectionString',
security_protocol='SASL_SSL',
value_deserializer=lambda x: x.decode('utf-8')
)
if len(sys.argv) > 1:
if "-a" in sys.argv[1] and '.' not in sys.argv[1]:
consumer.subscribe(topics=consumer.topics()) # subscribe to all available topics
else:
PATTERN = sys.argv[1]
consumer.subscribe(pattern=PATTERN) # subscribe to topic(s) matching pattern
print("subscription topics: %s" % (list(consumer.subscription())), end='')
print(" + re('%s')" % PATTERN if PATTERN else '', end='')
print(" as of %s" % (datetime.datetime.now()))
for message in consumer:
message_dict = json.loads(message.value)
print("\n%s [%s]" % (datetime.datetime.now(),message.topic), end=' ')
print("%s" % (json.dumps(message_dict)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment