Skip to content

Instantly share code, notes, and snippets.

@mertyildiran
Created September 3, 2021 23:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mertyildiran/edcbdf3abd6a88d478d1d16cc5c2ddc2 to your computer and use it in GitHub Desktop.
Save mertyildiran/edcbdf3abd6a88d478d1d16cc5c2ddc2 to your computer and use it in GitHub Desktop.
Kafka Py demo code
from confluent_kafka.admin import AdminClient, NewTopic
a = AdminClient({'bootstrap.servers': 'kafka:9092'})
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in ["topic1", "topic2"]]
# Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.
# Call create_topics to asynchronously create topics. A dict
# of <topic,future> is returned.
print("CreateTopic")
fs = a.create_topics(new_topics)
# Wait for each operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'kafka:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
})
c.subscribe(['mytopic'])
while True:
print("Fetch")
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()
from confluent_kafka import Producer
p = Producer({'bootstrap.servers': 'kafka:9092'})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
for data in [
"a",
"b",
"c"
]:
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
print("Produce")
p.produce('mytopic', data.encode('utf-8'), callback=delivery_report)
# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()
#!/bin/bash
python3 consume.py &
while true; do
echo "-----" && \
python3 admin_client.py && \
python3 produce.py && \
sleep 5
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment