Skip to content

Instantly share code, notes, and snippets.

@OneCricketeer
Last active January 30, 2019 17:18
Show Gist options
  • Save OneCricketeer/4db67f51fcaa02776340237762950b67 to your computer and use it in GitHub Desktop.
Save OneCricketeer/4db67f51fcaa02776340237762950b67 to your computer and use it in GitHub Desktop.
Confluent-Kafka-Python Avro Values and String Keys
from time import time
from confluent_kafka import avro
from confluent_kafka import Producer
from kafka_utils import bootstrap_servers, topic
from kafka_utils import serialize_avro
from kafka_utils import delivery_report
from model import LogEvent
# key_schema = avro.load('kafka/avro_schemas/key.avsc')
value_schema = avro.load('avro_schemas/value.avsc') # TODO: Create avro_schemas folder
p = Producer({'bootstrap.servers': bootstrap_servers})
def send_avro(value, key=None, timestamp=None):
"""
Send value as Avro serialized object. Object class should be defined with asdict() function.
:param value: class object
:param key: <str|bytes> (default: None, so null key)
:param timestamp: unix millis since epoch
"""
if timestamp is not None:
timestamp = int(time())
# Asynchronously produce a message, the delivery report callback
# will be triggered from poll() above, or flush() below, when the message has
# been successfully delivered or failed permanently.
if hasattr(value, 'asdict'):
value_payload = serialize_avro(topic, value_schema, value.asdict(), is_key=False)
else:
# TODO: Make this better?
raise Exception("asdict() is not defined for {}".format(type(value)))
p.produce(topic, key=key, value=value_payload, callback=delivery_report)
def main():
for x in range(100):
# Trigger any available delivery report callbacks from previous produce() calls
p.poll(0)
timestamp = int(time())
value_obj = LogEvent.Value("test", "Hello, Python! {}".format(x), timestamp=timestamp)
send_avro(value_obj, key="record={}".format(x), timestamp=timestamp)
flush()
def flush():
p.flush()
if __name__ == '__main__':
main()
import os
from confluent_kafka.avro import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer as AvroSerializer
bootstrap_servers = os.environ.get('BOOTSTRAP_SERVERS', 'localhost:9092')
topic = os.environ.get('KAFKA_TOPIC', 'my_topic')
schema_registry = CachedSchemaRegistryClient(os.environ.get('SCHEMA_REGISTRY', 'http://localhost:8081'))
def load_schema_from_registry(subject, is_key=False, version_id='latest'):
global schema_registry
suffix = '-key' if is_key else '-value'
try:
return schema_registry.get_schema("{}-{}".format(subject, suffix, version_id))
except Exception as e:
raise e
def delivery_report(err, msg, verbose=False):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None and verbose:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
# ================
# Serializers
# ================
def serialize_str(s):
return s.encode('utf-8')
avro_serializer = AvroSerializer(schema_registry)
serialize_avro = avro_serializer.encode_record_with_schema # extract function definition
from time import time
class LogEvent:
"""
An Avro Record to send to Kafka
key: str
value: LogEvent.value
"""
class Value:
"""
Translated model class from an Avro schema
"""
def __init__(self, description, message, timestamp=None):
self.timestamp = int(time()) if timestamp is None else timestamp # Kafka doesn't have times in the record before 0.10
self.description = description # to trace where message came from
self.message = message
def asdict(self):
return {
"timestamp": self.timestamp,
"description": self.description,
"message": self.message
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment