Last active
February 1, 2024 20:53
-
-
Save itssoap/9071397e3a38d9a66519ab52297a131b to your computer and use it in GitHub Desktop.
Dummy Kafka Producer using Confluent Kafka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Benchmarking Kafka on cloud to understand the latency and throughput | |
with async and sync when >>> acks=all | |
AKHQ used for checking the Topic | |
Future goal will be implementing this around a proper Avro schema | |
""" | |
import orjson | |
from confluent_kafka import Producer, version, libversion | |
import time | |
import argparse | |
# Benchmark params | |
start_time = time.time() | |
async_exec: bool = True | |
request_count: int = 10 | |
payload_size: int = 0 | |
# Convenience functions | |
def parse_args(description): | |
"""Parse command line arguments""" | |
parser = argparse.ArgumentParser(description=description) | |
parser._action_groups.pop() | |
required = parser.add_argument_group('required arguments') | |
required.add_argument('-f', | |
dest="config_file", | |
help="path to the producer / consumer configuration file", | |
required=True) | |
required.add_argument('-t', | |
dest="topic", | |
help="topic name", | |
required=True) | |
args = parser.parse_args() | |
return args | |
def read_config(config_file): | |
"""Read configuration properties from the specified config file""" | |
conf = {} | |
with open(config_file) as fh: | |
for line in fh: | |
line = line.strip() | |
if len(line) != 0 and line[0] != "#": | |
parameter, value = line.strip().split('=', 1) | |
conf[parameter.strip()] = value.strip() | |
return conf | |
# Initialization | |
args = common.parse_args(description="Simple Producer to send some sample messages to Kafka Brokers") | |
config_file = args.config_file | |
topic = args.topic | |
conf = common.read_config(config_file) | |
# Create Producer instance | |
producer = Producer(conf) | |
# Optional per-message delivery handler (triggered by poll() or flush()) when a message | |
# has been successfully delivered or permanently failed delivery (after retries). | |
def delivery_callback(err, msg): | |
"""Delivery report handler called on successful or failed delivery of message""" | |
if err is not None: | |
print(f"Failed to deliver message: {err}") | |
else: | |
print(f"Message delivered to {msg.topic()} (partition {msg.partition()}) at offset {msg.offset()}") | |
for n in range(request_count): | |
# key is a CHAR11 iterator | |
key = str(n).zfill(11)[-11:] #random.randint(0, 1000) | |
# value = f"Test {key}" | |
value = payload = orjson.dumps( | |
{ | |
"name" : "itssoap", | |
"email": "admin@itssoap.ninja" | |
}).decode("utf-8") | |
payload_size += len(payload) | |
print(f"Producing message to {topic}: key={key}, value={value}") | |
producer.produce(topic, | |
key=bytes(str(key), encoding='utf-8'), #,key.to_bytes(length=8, byteorder="big"), | |
value=value, | |
on_delivery=delivery_callback) | |
if not async_exec: # synchronousity depends on whether you flush per call or per batch | |
producer.flush() | |
producer.poll(0) | |
print("Waiting for delivery results") | |
if async_exec: | |
producer.flush() | |
print("Done") | |
print("-------------------------------") | |
print("Debug info:") | |
print("--- %s seconds ---" % (time.time() - start_time)) | |
print(f"Payload total size: {payload_size} bytes") | |
print(f"Confluent version: {version()}") | |
print(f"Confluent Library version: {libversion()}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Execution:
Dummy *.properties file: