Created
September 3, 2021 23:45
-
-
Save mertyildiran/edcbdf3abd6a88d478d1d16cc5c2ddc2 to your computer and use it in GitHub Desktop.
Kafka Py demo code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka.admin import AdminClient, NewTopic | |
a = AdminClient({'bootstrap.servers': 'kafka:9092'}) | |
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in ["topic1", "topic2"]] | |
# Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability. | |
# Call create_topics to asynchronously create topics. A dict | |
# of <topic,future> is returned. | |
print("CreateTopic") | |
fs = a.create_topics(new_topics) | |
# Wait for each operation to finish. | |
for topic, f in fs.items(): | |
try: | |
f.result() # The result itself is None | |
print("Topic {} created".format(topic)) | |
except Exception as e: | |
print("Failed to create topic {}: {}".format(topic, e)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka import Consumer | |
c = Consumer({ | |
'bootstrap.servers': 'kafka:9092', | |
'group.id': 'mygroup', | |
'auto.offset.reset': 'earliest' | |
}) | |
c.subscribe(['mytopic']) | |
while True: | |
print("Fetch") | |
msg = c.poll(1.0) | |
if msg is None: | |
continue | |
if msg.error(): | |
print("Consumer error: {}".format(msg.error())) | |
continue | |
print('Received message: {}'.format(msg.value().decode('utf-8'))) | |
c.close() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from confluent_kafka import Producer | |
p = Producer({'bootstrap.servers': 'kafka:9092'}) | |
def delivery_report(err, msg): | |
""" Called once for each message produced to indicate delivery result. | |
Triggered by poll() or flush(). """ | |
if err is not None: | |
print('Message delivery failed: {}'.format(err)) | |
else: | |
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) | |
for data in [ | |
"a", | |
"b", | |
"c" | |
]: | |
# Trigger any available delivery report callbacks from previous produce() calls | |
p.poll(0) | |
# Asynchronously produce a message, the delivery report callback | |
# will be triggered from poll() above, or flush() below, when the message has | |
# been successfully delivered or failed permanently. | |
print("Produce") | |
p.produce('mytopic', data.encode('utf-8'), callback=delivery_report) | |
# Wait for any outstanding messages to be delivered and delivery report | |
# callbacks to be triggered. | |
p.flush() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
python3 consume.py & | |
while true; do | |
echo "-----" && \ | |
python3 admin_client.py && \ | |
python3 produce.py && \ | |
sleep 5 | |
done |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment