Skip to content

Instantly share code, notes, and snippets.

@asdaraujo
Last active March 16, 2023 19:37
Show Gist options
  • Save asdaraujo/8366763b4022fe346e989dcfa691abff to your computer and use it in GitHub Desktop.
Save asdaraujo/8366763b4022fe346e989dcfa691abff to your computer and use it in GitHub Desktop.
import avro.schema
import io
import os
import requests
from avro.io import DatumReader, BinaryDecoder
from cachetools import TTLCache
from kafka import KafkaConsumer
# Kafka broker
BROKERS = ['cdp.52.33.201.179.nip.io:9092']
# Kafka topics
TOPICS = ['machine-data-nifi']
# Schema Registry base URL
SCHEMA_REGISTRY_URL = "http://cdp.52.33.201.179.nip.io:7788"
# Indicates if the producer is using a Kafka Avro Schema Registry serializer.
PRODUCER_USES_SCHEMA_REGISTRY = True
# If using Schema Registry the reader schema file is optional, since the client can
# retrieve the schema from the registry. Otherwise it must be provided.
# If the file is provided, the reader schema will be used rather than the one from registry.
READER_SCHEMA_FILE = "machine.avsc"
consumer = KafkaConsumer(bootstrap_servers=BROKERS)
consumer.subscribe(TOPICS)
class SchemaRegistryClient(object):
SCHEMA_METADATA = "/api/v1/schemaregistry/schemasById/{}"
SCHEMA_VERSION = "/api/v1/schemaregistry/schemas/{}/versions/{}"
SCHEMA_VERSION_BY_ID = "/api/v1/schemaregistry/schemas/versionsById/{}"
def __init__(self, url, cache_size=100, ttl_secs=3600):
self.base_url = url
self.schema_name_cache = TTLCache(maxsize=cache_size, ttl=ttl_secs)
self.schema_cache = TTLCache(maxsize=cache_size, ttl=ttl_secs)
def get_schema_name(self, metadata_id):
if metadata_id not in self.schema_name_cache:
r = requests.get(self.base_url + self.SCHEMA_METADATA.format(metadata_id))
if r.status_code != requests.codes.ok:
raise RuntimeError("Failed to retrieve schema name (Error: {}, Message: {})".format(r.status_code, r.text))
self.schema_name_cache[metadata_id] = r.json()['schemaMetadata']['name']
return self.schema_name_cache[metadata_id]
def get_schema(self, metadata_id=None, version_id=None):
if not version_id:
version_id = 'latest'
if metadata_id:
path = self.SCHEMA_VERSION.format(self.get_schema_name(metadata_id), version_id)
else:
path = self.SCHEMA_VERSION_BY_ID.format(version_id)
identifier = (metadata_id, version_id)
if identifier not in self.schema_cache:
r = requests.get(self.base_url + path)
if r.status_code != requests.codes.ok:
raise RuntimeError("Failed to retrieve schema (Error: {}, Message: {})".format(r.status_code, r.text))
j = r.json()
self.schema_cache[identifier] = avro.schema.parse(j['schemaText'])
print("Fetched version {} of schema {} from Schema Registry".format(j['version'], j['name']))
return self.schema_cache[identifier]
if os.path.exists(READER_SCHEMA_FILE):
print("Using schema from local file {}".format(READER_SCHEMA_FILE))
reader_schema = avro.schema.parse(open(READER_SCHEMA_FILE, "rb").read())
else:
reader_schema = None
sr = SchemaRegistryClient(SCHEMA_REGISTRY_URL)
for message in consumer:
schema = reader_schema
if PRODUCER_USES_SCHEMA_REGISTRY:
protocol_id = message.value[0]
metadata_id = None
version_id = None
if protocol_id == 0: # Confluent protocol
metadata_id = int.from_bytes(message.value[1:5], byteorder='big', signed=False)
payload = message.value[5:]
elif protocol_id == 1: # Schema metadata id and version protocol
metadata_id = int.from_bytes(message.value[1:9], byteorder='big', signed=False)
version_id = int.from_bytes(message.value[9:13], byteorder='big', signed=False)
payload = message.value[13:]
elif protocol_id == 2: # Schema version id as long protocol
metadata_id = int.from_bytes(message.value[1:9], byteorder='big', signed=False)
payload = message.value[9:]
elif protocol_id == 3: # Schema version id as int protocol
# TODO: Handle the case where schema id is larger than max integer value
version_id = int.from_bytes(message.value[1:5], byteorder='big', signed=False)
payload = message.value[5:]
else:
raise RuntimeError("Unimplemented protocol %s. See https://github.com/hortonworks/registry/blob/master/docs/serdes.rst" % (protocol_id,))
if metadata_id or version_id:
if not reader_schema:
schema = sr.get_schema(metadata_id, version_id)
else:
payload = message.value
bytes_reader = io.BytesIO(payload)
decoder = BinaryDecoder(bytes_reader)
if schema is None:
raise RuntimeError("Schema must not be None")
reader = DatumReader(schema)
print(reader.read(decoder))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment