Last active March 16, 2023 19:37
import avro.schema
import io
import os
import requests
from import DatumReader, BinaryDecoder
from cachetools import TTLCache
from kafka import KafkaConsumer
# Kafka broker
BROKERS = ['']
# Kafka topics
TOPICS = ['machine-data-nifi']
# Schema Registry base URL
# Indicates if the producer is using a Kafka Avro Schema Registry serializer.
# If using Schema Registry the reader schema file is optional, since the client can
# retrieve the schema from the registry. Otherwise it must be provided.
# If the file is provided, the reader schema will be used rather than the one from registry.
READER_SCHEMA_FILE = "machine.avsc"
consumer = KafkaConsumer(bootstrap_servers=BROKERS)
class SchemaRegistryClient(object):
SCHEMA_METADATA = "/api/v1/schemaregistry/schemasById/{}"
SCHEMA_VERSION = "/api/v1/schemaregistry/schemas/{}/versions/{}"
SCHEMA_VERSION_BY_ID = "/api/v1/schemaregistry/schemas/versionsById/{}"
def __init__(self, url, cache_size=100, ttl_secs=3600):
self.base_url = url
self.schema_name_cache = TTLCache(maxsize=cache_size, ttl=ttl_secs)
self.schema_cache = TTLCache(maxsize=cache_size, ttl=ttl_secs)
def get_schema_name(self, metadata_id):
if metadata_id not in self.schema_name_cache:
r = requests.get(self.base_url + self.SCHEMA_METADATA.format(metadata_id))
if r.status_code !=
raise RuntimeError("Failed to retrieve schema name (Error: {}, Message: {})".format(r.status_code, r.text))
self.schema_name_cache[metadata_id] = r.json()['schemaMetadata']['name']
return self.schema_name_cache[metadata_id]
def get_schema(self, metadata_id=None, version_id=None):
if not version_id:
version_id = 'latest'
if metadata_id:
path = self.SCHEMA_VERSION.format(self.get_schema_name(metadata_id), version_id)
path = self.SCHEMA_VERSION_BY_ID.format(version_id)
identifier = (metadata_id, version_id)
if identifier not in self.schema_cache:
r = requests.get(self.base_url + path)
if r.status_code !=
raise RuntimeError("Failed to retrieve schema (Error: {}, Message: {})".format(r.status_code, r.text))
j = r.json()
self.schema_cache[identifier] = avro.schema.parse(j['schemaText'])
print("Fetched version {} of schema {} from Schema Registry".format(j['version'], j['name']))
return self.schema_cache[identifier]
if os.path.exists(READER_SCHEMA_FILE):
print("Using schema from local file {}".format(READER_SCHEMA_FILE))
reader_schema = avro.schema.parse(open(READER_SCHEMA_FILE, "rb").read())
reader_schema = None
sr = SchemaRegistryClient(SCHEMA_REGISTRY_URL)
for message in consumer:
schema = reader_schema
protocol_id = message.value[0]
metadata_id = None
version_id = None
if protocol_id == 0: # Confluent protocol
metadata_id = int.from_bytes(message.value[1:5], byteorder='big', signed=False)
payload = message.value[5:]
elif protocol_id == 1: # Schema metadata id and version protocol
metadata_id = int.from_bytes(message.value[1:9], byteorder='big', signed=False)
version_id = int.from_bytes(message.value[9:13], byteorder='big', signed=False)
payload = message.value[13:]
elif protocol_id == 2: # Schema version id as long protocol
metadata_id = int.from_bytes(message.value[1:9], byteorder='big', signed=False)
payload = message.value[9:]
elif protocol_id == 3: # Schema version id as int protocol
# TODO: Handle the case where schema id is larger than max integer value
version_id = int.from_bytes(message.value[1:5], byteorder='big', signed=False)
payload = message.value[5:]
raise RuntimeError("Unimplemented protocol %s. See" % (protocol_id,))
if metadata_id or version_id:
if not reader_schema:
schema = sr.get_schema(metadata_id, version_id)
payload = message.value
bytes_reader = io.BytesIO(payload)
decoder = BinaryDecoder(bytes_reader)
if schema is None:
raise RuntimeError("Schema must not be None")
reader = DatumReader(schema)
