Skip to content

Instantly share code, notes, and snippets.

@itssoap
Last active February 1, 2024 20:53
Show Gist options
  • Save itssoap/9071397e3a38d9a66519ab52297a131b to your computer and use it in GitHub Desktop.
Save itssoap/9071397e3a38d9a66519ab52297a131b to your computer and use it in GitHub Desktop.
Dummy Kafka Producer using Confluent Kafka
"""
Benchmarking Kafka on cloud to understand the latency and throughput
with async and sync when >>> acks=all
AKHQ used for checking the Topic
Future goal will be implementing this around a proper Avro schema
"""
import orjson
from confluent_kafka import Producer, version, libversion
import time
import argparse
# Benchmark params
start_time = time.time()
async_exec: bool = True
request_count: int = 10
payload_size: int = 0
# Convenience functions
def parse_args(description):
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description=description)
parser._action_groups.pop()
required = parser.add_argument_group('required arguments')
required.add_argument('-f',
dest="config_file",
help="path to the producer / consumer configuration file",
required=True)
required.add_argument('-t',
dest="topic",
help="topic name",
required=True)
args = parser.parse_args()
return args
def read_config(config_file):
"""Read configuration properties from the specified config file"""
conf = {}
with open(config_file) as fh:
for line in fh:
line = line.strip()
if len(line) != 0 and line[0] != "#":
parameter, value = line.strip().split('=', 1)
conf[parameter.strip()] = value.strip()
return conf
# Initialization
args = common.parse_args(description="Simple Producer to send some sample messages to Kafka Brokers")
config_file = args.config_file
topic = args.topic
conf = common.read_config(config_file)
# Create Producer instance
producer = Producer(conf)
# Optional per-message delivery handler (triggered by poll() or flush()) when a message
# has been successfully delivered or permanently failed delivery (after retries).
def delivery_callback(err, msg):
"""Delivery report handler called on successful or failed delivery of message"""
if err is not None:
print(f"Failed to deliver message: {err}")
else:
print(f"Message delivered to {msg.topic()} (partition {msg.partition()}) at offset {msg.offset()}")
for n in range(request_count):
# key is a CHAR11 iterator
key = str(n).zfill(11)[-11:] #random.randint(0, 1000)
# value = f"Test {key}"
value = payload = orjson.dumps(
{
"name" : "itssoap",
"email": "admin@itssoap.ninja"
}).decode("utf-8")
payload_size += len(payload)
print(f"Producing message to {topic}: key={key}, value={value}")
producer.produce(topic,
key=bytes(str(key), encoding='utf-8'), #,key.to_bytes(length=8, byteorder="big"),
value=value,
on_delivery=delivery_callback)
if not async_exec: # synchronousity depends on whether you flush per call or per batch
producer.flush()
producer.poll(0)
print("Waiting for delivery results")
if async_exec:
producer.flush()
print("Done")
print("-------------------------------")
print("Debug info:")
print("--- %s seconds ---" % (time.time() - start_time))
print(f"Payload total size: {payload_size} bytes")
print(f"Confluent version: {version()}")
print(f"Confluent Library version: {libversion()}")
@itssoap
Copy link
Author

itssoap commented Nov 10, 2023

Execution:

py kafkaproducer.py -f producer.properties -t topicName

Dummy *.properties file:

bootstrap.servers=localhost:9092

security.protocol=SASL_SSL

client.dns.lookup=use_all_dns_ips
acks=all
linger.ms=100
compression.type=gzip

sasl.mechanism=PLAIN
sasl.username=NKDJCNEIFEHFOIEFJ
sasl.password=ksjiufjf3ur93joidfenflkefnlfwf+/fwffjn

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment