Skip to content

Instantly share code, notes, and snippets.

@xshapira
Forked from iKunalChhabra/kafka_connect.py
Created January 21, 2023 10:19
Show Gist options
  • Save xshapira/21d57ac3082e9ec68c6e9f428d3feceb to your computer and use it in GitHub Desktop.
Save xshapira/21d57ac3082e9ec68c6e9f428d3feceb to your computer and use it in GitHub Desktop.
Kafka consumer producer class in python
from confluent_kafka import Consumer, Producer, KafkaError, KafkaException
class Kafka:
def __init__(self, bootstrap_server, topic, timeout=1.0):
self.__bootstrap_server = bootstrap_server
self.__topic = topic
self.__timeout = timeout
@staticmethod
def __on_commit(err, partitions):
if err:
print(str(err))
else:
print(f"Messaged committed to Topic = {partitions[0].topic}, Partition = {partitions[0].partition}, Offset = {partitions[0].offset}")
@staticmethod
def __callback(err, msg):
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to Topic = {msg.topic()}, Key = {msg.key()}, Partition = {msg.partition()}, Offset = {msg.offset()}')
def __generate_consumer_properties(self, group_id, conf, key_deserializer, value_deserializer, on_commit):
default_conf = {'bootstrap.servers': self.__bootstrap_server,
'group.id': group_id}
if not conf:
conf = default_conf
conf['enable.auto.commit'] = False
conf['auto.offset.reset'] = 'earliest'
else:
conf = {**default_conf, **conf}
if value_deserializer is None:
value_deserializer = lambda x: x
if key_deserializer is None:
key_deserializer = lambda x: x
if on_commit is None:
on_commit = self.__on_commit
conf['on_commit'] = on_commit
return conf, key_deserializer, value_deserializer
def __generate_producer_properties(self, conf, key_serializer, value_serializer, callback):
default_conf = {'bootstrap.servers': self.__bootstrap_server}
if not conf:
conf = default_conf
conf['acks'] = 'all'
conf['retries'] = 5
conf['linger.ms'] = 20
conf['batch.num.messages'] = 1000
conf['batch.size'] = 1024 * 64 # 64KB
conf['compression.codec'] = 'snappy'
conf['enable.idempotence'] = True
conf['delivery.timeout.ms'] = 120000 # 2 minutes
conf['max.in.flight.requests.per.connection'] = 5
else:
conf = {**default_conf, **conf}
if value_serializer is None:
value_serializer = lambda x: x
if key_serializer is None:
key_serializer = lambda x: x
if callback is None:
callback = self.__callback
return conf, key_serializer, value_serializer, callback
def consume(self, group_id, conf={}, key_deserializer=None, value_deserializer=None, on_commit=None):
conf, key_deserializer, value_deserializer = self.__generate_consumer_properties(group_id,
conf,
key_deserializer,
value_deserializer,
on_commit)
consumer = Consumer(**conf)
try:
consumer.subscribe([self.__topic])
while True:
msg = consumer.poll(timeout=self.__timeout)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print(f'{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}\n')
elif msg.error():
raise KafkaException(msg.error())
yield {'Topic': msg.topic(),
'Value': value_deserializer(msg.value()),
'Key': key_deserializer(msg.key()),
'Offset': msg.offset(),
'Partition': msg.partition()}
consumer.commit(msg, asynchronous=True)
finally:
consumer.close()
def produce(self, data, conf={}, key_serializer=None, value_serializer=None, callback=None):
conf, key_serializer, value_serializer, callback = self.__generate_producer_properties(conf,
key_serializer,
value_serializer,
callback)
producer = Producer(**conf)
try:
for key, value in data:
print(f"Sending message to Topic = {self.__topic}, Key = {key}")
producer.produce(topic=self.__topic,
key=key_serializer(key),
value=value_serializer(value),
callback=callback)
producer.poll(timeout=self.__timeout)
finally:
producer.flush()
############# producer example #############
# from kafka import Kafka
# import json
#
# bootstrap_servers = 'localhost:9092'
# topic = 'first_topic'
# key_serializer = lambda x: x.encode('utf-8')
# value_serializer = lambda x: json.dumps(x).encode('utf-8')
# data = (('key1', {'Value': 'value1'}), ('key2', {'Value': 'value2'}), ('key3', {'Value': 'value3'}))
#
# k = Kafka(bootstrap_servers, topic)
# k.produce(data, key_serializer=key_serializer, value_serializer=value_serializer)
############# consumer example #############
# from kafka import Kafka
# import json
#
#
# bootstrap_servers = 'localhost:9092'
# topic = 'first_topic'
# group_id = 'my-group'
# value_deserializer = lambda x: json.loads(x.decode('utf-8'))
# key_deserializer = lambda x: x.decode('utf-8')
#
# k = Kafka(bootstrap_servers, topic)
# cur = k.consume(group_id, key_deserializer=key_deserializer, value_deserializer=value_deserializer)
# for msg in cur:
# print(msg)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment