Skip to content

Instantly share code, notes, and snippets.

@iKunalChhabra
Last active February 18, 2023 16:43
Show Gist options
  • Save iKunalChhabra/43ce648e06ccfb606b2150c6bf3ea5aa to your computer and use it in GitHub Desktop.
Save iKunalChhabra/43ce648e06ccfb606b2150c6bf3ea5aa to your computer and use it in GitHub Desktop.
Kafka consumer producer class in python
from confluent_kafka import Consumer, Producer, KafkaError, KafkaException
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
class Kafka:
def __init__(self, bootstrap_server, topic, timeout=60.0):
self.__bootstrap_server = bootstrap_server
self.__topic = topic
self.__timeout = timeout
self.__auth_conf = {}
def set_auth(self, auth_conf):
self.__auth_conf = auth_conf
@staticmethod
def __on_commit(err, partitions):
if err:
print(str(err))
else:
print(f"Messaged committed to Topic = {partitions[0].topic}, Partition = {partitions[0].partition}, Offset = {partitions[0].offset}")
@staticmethod
def __callback(err, msg):
if err:
print(f'Message delivery failed: {err}')
else:
print(f'Message delivered to Topic = {msg.topic()}, Key = {msg.key()}, Partition = {msg.partition()}, Offset = {msg.offset()}')
def __generate_consumer_properties(self, group_id, conf, key_deserializer, value_deserializer, on_commit):
default_conf = {'bootstrap.servers': self.__bootstrap_server,
'group.id': group_id}
default_conf.update(self.__auth_conf)
if not conf:
conf = default_conf
conf['enable.auto.commit'] = False
conf['auto.offset.reset'] = 'earliest'
else:
conf = {**default_conf, **conf}
if value_deserializer is None:
value_deserializer = lambda x: x
if key_deserializer is None:
key_deserializer = lambda x: x
if on_commit is None:
on_commit = self.__on_commit
conf['on_commit'] = on_commit
return conf, key_deserializer, value_deserializer
def __generate_producer_properties(self, conf, key_serializer, value_serializer, callback):
default_conf = {'bootstrap.servers': self.__bootstrap_server}
default_conf.update(self.__auth_conf)
if not conf:
conf = default_conf
conf['acks'] = 'all'
conf['retries'] = 5
conf['linger.ms'] = 100
conf['batch.num.messages'] = 1_000
conf['batch.size'] = 1024 * 64 # 64KB
conf['compression.codec'] = 'snappy'
conf['enable.idempotence'] = True
conf['delivery.timeout.ms'] = 1000*120 # 2 minutes
conf['max.in.flight.requests.per.connection'] = 5
else:
conf = {**default_conf, **conf}
if value_serializer is None:
value_serializer = lambda x: x
if key_serializer is None:
key_serializer = lambda x: x
if callback is None:
callback = self.__callback
return conf, key_serializer, value_serializer, callback
def get_schema_registry_client(self, conf):
return SchemaRegistryClient(conf)
def get_schema(self, schema_registry_conf, subject):
client = self.get_schema_registry_client(schema_registry_conf)
return client.get_latest_version(subject).schema.schema_str
def get_avro_serializer(self, schema_registry_conf, subject, type):
client = self.get_schema_registry_client(schema_registry_conf)
schema = self.get_schema(schema_registry_conf, subject)
avro_conf = {'auto.register.schemas': False,
'use.latest.version': True}
avro_ser = AvroSerializer(schema_registry_client=client,
schema_str=schema,
conf=avro_conf)
if type == 'KEY':
return lambda x: avro_ser(x, SerializationContext(self.__topic, MessageField.KEY))
elif type == 'VALUE':
return lambda x: avro_ser(x, SerializationContext(self.__topic, MessageField.VALUE))
else:
raise ValueError(f'Invalid type {type}')
def get_avro_deserializer(self, schema_registry_conf, subject, type):
client = self.get_schema_registry_client(schema_registry_conf)
schema = self.get_schema(schema_registry_conf, subject)
avro_deser = AvroDeserializer(schema_registry_client=client,
schema_str=schema)
if type == 'KEY':
return lambda x: avro_deser(x, SerializationContext(self.__topic, MessageField.KEY))
elif type == 'VALUE':
return lambda x: avro_deser(x, SerializationContext(self.__topic, MessageField.VALUE))
else:
raise ValueError(f'Invalid type {type}')
def consume(self, group_id, conf={}, key_deserializer=None, value_deserializer=None, on_commit=None):
conf, key_deserializer, value_deserializer = self.__generate_consumer_properties(group_id,
conf,
key_deserializer,
value_deserializer,
on_commit)
consumer = Consumer(**conf)
try:
consumer.subscribe([self.__topic])
while True:
msg = consumer.poll(timeout=self.__timeout)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print(f'{msg.topic()} [{msg.partition()}] reached end at offset {msg.offset()}\n')
elif msg.error():
raise KafkaException(msg.error())
yield {'Topic': msg.topic(),
'Value': value_deserializer(msg.value()),
'Key': key_deserializer(msg.key()),
'Offset': msg.offset(),
'Partition': msg.partition()}
consumer.commit(msg, asynchronous=True)
finally:
consumer.close()
def produce(self, data, conf={}, key_serializer=None, value_serializer=None, callback=None):
conf, key_serializer, value_serializer, callback = self.__generate_producer_properties(conf,
key_serializer,
value_serializer,
callback)
producer = Producer(**conf)
try:
for key, value in data:
print(f"Sending message to Topic = {self.__topic}, Key = {key}")
producer.produce(topic=self.__topic,
key=key_serializer(key),
value=value_serializer(value),
callback=callback)
finally:
producer.flush(timeout=self.__timeout)
# ############ producer example #############
# from kafka import Kafka
# import json
#
# bootstrap_servers = 'localhost:9092'
# topic = 'first_topic'
# key_serializer = lambda x: x.encode('utf-8')
# value_serializer = lambda x: json.dumps(x).encode('utf-8')
# data = (('key1', {'Value': 'value1'}), ('key2', {'Value': 'value2'}), ('key3', {'Value': 'value3'}))
#
# k = Kafka(bootstrap_servers, topic)
# k.produce(data, key_serializer=key_serializer, value_serializer=value_serializer)
# ############ consumer example #############
# from kafka import Kafka
# import json
#
#
# bootstrap_servers = 'localhost:9092'
# topic = 'first_topic'
# group_id = 'my-group'
# value_deserializer = lambda x: json.loads(x.decode('utf-8'))
# key_deserializer = lambda x: x.decode('utf-8')
#
# k = Kafka(bootstrap_servers, topic)
# cur = k.consume(group_id, key_deserializer=key_deserializer, value_deserializer=value_deserializer)
# for msg in cur:
# print(msg)
# ############ avro producer example #############
# from kafka import Kafka
#
# k = Kafka(bootstrap_servers, topic)
#
# bootstrap_servers = 'localhost:9092'
# topic = 'first_topic'
#
# schema_registry_conf = {'url': 'http://localhost:8081', 'basic.auth.credentials.source': 'USER_INFO', 'basic.auth.user.info': 'user:password'}
# key_serializer = k.get_avro_serializer(schema_registry_conf, 'first_topic-key', 'KEY')
# value_serializer = k.get_avro_serializer(schema_registry_conf, 'first_topic-value', 'VALUE')
# data = (('key1', {'Value': 'value1'}), ('key2', {'Value': 'value2'}), ('key3', {'Value': 'value3'}))
#
# k.produce(data, key_serializer=key_serializer, value_serializer=value_serializer)
# ############ avro consumer example #############
# from kafka import Kafka
#
# k = Kafka(bootstrap_servers, topic)
#
# bootstrap_servers = 'localhost:9092'
# topic = 'first_topic'
# group_id = 'my-group'
#
# schema_registry_conf = {'url': 'http://localhost:8081', 'basic.auth.credentials.source': 'USER_INFO', 'basic.auth.user.info': 'user:password'}
# key_deserializer = k.get_avro_deserializer(schema_registry_conf, 'first_topic-key', 'KEY')
# value_deserializer = k.get_avro_deserializer(schema_registry_conf, 'first_topic-value', 'VALUE')
#
# cur = k.consume(group_id, key_deserializer=key_deserializer, value_deserializer=value_deserializer)
# for msg in cur:
# print(msg)
# ############ SASL SSL Auth Example #############
# from kafka import Kafka
#
# bootstrap_servers = 'localhost:9092'
# topic = 'first_topic'
# k = Kafka(bootstrap_servers, topic)
#
# data = (('key1', {'Value': 'value1'}), ('key2', {'Value': 'value2'}), ('key3', {'Value': 'value3'}))
#
# auth_config = {'sasl.mechanism': 'PLAIN',
# 'security.protocol': 'SASL_SSL',
# 'sasl.username': 'user',
# 'sasl.password': 'password'}
# k.set_auth(auth_config)
#
# k.produce(data)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment