Skip to content

Instantly share code, notes, and snippets.

@leoGalani
Last active February 10, 2023 15:25
Show Gist options
  • Save leoGalani/322ef44ad062d5df1b8350e420263503 to your computer and use it in GitHub Desktop.
Save leoGalani/322ef44ad062d5df1b8350e420263503 to your computer and use it in GitHub Desktop.
kafka-confluent-python implementation example
import certifi
from dynaconf import settings
from confluent_kafka.avro import AvroProducer
from confluent_kafka import Consumer
BASE_CONFIG = {
'bootstrap.servers': settings.KAFKA_PRIMARY_BOOTSTRAP_SERVERS,
'group.id': 'integrated-tests',
'client.id': 'integrated-tests',
'security.protocol': settings.KAFKA_PRIMARY_SECURITY_PROTOCOL,
'sasl.mechanisms': settings.get('KAFKA_PRIMARY_SASL_MECHANISM', 'PLAIN'),
'sasl.username': settings.KAFKA_PRIMARY_KEY,
'sasl.password': settings.KAFKA_PRIMARY_SECRET,
'ssl.endpoint.identification.algorithm': 'https',
'ssl.ca.location': certifi.where(),
'auto.offset.reset': 'earliest',
}
AVRO_SCHEMA_CONFIG = {
'schema.registry.url': settings.KAFKA_PRIMARY_SCHEMA_REGISTRY_ENDPOINT,
'schema.registry.basic.auth.credentials.source': 'USER_INFO',
'schema.registry.basic.auth.user.info': '{}:{}'.format(
settings.KAFKA_PRIMARY_SCHEMA_REGISTRY_KEY,
settings.KAFKA_PRIMARY_SCHEMA_REGISTRY_SECRET
)
}
PRODUCER_SCHEMA_CONFIG = {**BASE_CONFIG, **AVRO_SCHEMA_CONFIG}
def setup_consumer():
return Consumer(BASE_CONFIG)
def setup_producer(schema_value):
producer_config = PRODUCER_SCHEMA_CONFIG.copy()
producer_config['on_delivery'] = delivery_check
avro_producer = AvroProducer(
producer_config,
default_value_schema=schema_value
)
return avro_producer
def delivery_check(err, msg):
"""
Reports the failure or success of a message delivery.
Args:
err (KafkaError): The error that occurred on None on success.
msg (Message): The message that was produced or failed.
"""
if err is not None:
raise ValueError("Failed to execute command: {}".format(err))
"""
Module for consuming messages on a kafka cluster + kafka schema register.
Since the AvroConsumer from the confluent lib does not work out o the box,
its required to do the decoding manually.
Luckly someone already pass throught this pain and did a blog post about this.
https://mlnotetaking.com/post/fixing-kafka-string-key-and-avro-value-python
"""
import time
import logging
import avro.schema
from config import setup_consumer
from utils import unpack, fetch_schema, msg_sanity_check
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.DEBUG)
def event_polling(topic, key_value, msg_value, qty_msgs):
"""
Polling on the kafka stream and
validate the msg key and value before returning data
"""
consumer = setup_consumer()
schema_str_value = fetch_schema(topic)
value_schema = avro.schema.Parse(schema_str_value)
consumer.subscribe([topic])
msgs = []
timeout = time.time() + 20
while timeout >= time.time() :
raw_msg = consumer.poll(1)
if not msg_sanity_check(raw_msg):
continue
msg = unpack(raw_msg.value(), value_schema)
if msg[key_value] == msg_value:
msgs.append(msg)
if len(msgs) == qty_msgs:
break
if len(msgs) < qty_msgs:
raise Exception('Less events than expected')
consumer.close()
return msgs
import avro.schema
from utils import fetch_schema
from config import setup_producer
def command_producer(topic, msg):
"""
Fetch the schema, create the producer instance and send the command
using AvroProducer.
"""
schema_str_value = fetch_schema(topic)
value_schema = avro.schema.Parse(schema_str_value)
producer = setup_producer(value_schema)
producer.produce(topic=topic, value=msg)
producer.flush()
import requests
from avro.io import BinaryDecoder, DatumReader
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.DEBUG)
def fetch_schema(subject):
"""
To fetch the schema from kafka confluent, the topic names have a sufix of '-value' on the name
"""
subject = subject + '-value'
url = "{0}/subjects/{1}/versions/latest/schema".format(
"KAFKA_PRIMARY_SCHEMA_REGISTRY_ENDPOINT",
subject
)
response = requests.get(
url = url,
auth = (
"KAFKA_PRIMARY_SCHEMA_REGISTRY_KEY",
"KAFKA_PRIMARY_SCHEMA_REGISTRY_SECRET"
),
headers= {"Content-Type": "application/vnd.schemaregistry.v1+json"}
)
return response.text
def msg_sanity_check(msg):
try:
if msg is None:
return False
if msg.error() is not None:
LOGGER.warning("Msg error: %s", msg.error())
return False
finally:
return True
def unpack(string, schema):
reader = DatumReader(schema)
for position in range(0,11):
try:
decoder = BinaryDecoder(io.BytesIO(string[position:]))
decoded_msg = reader.read(decoder)
return decoded_msg
except AssertionError:
continue
raise Exception('Msg cannot be decoded. MSG: {}.'.format(string))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment