Skip to content

Instantly share code, notes, and snippets.

@wirelessr
Created March 29, 2023 07:36
Show Gist options
  • Save wirelessr/f8680b319e9a98af1440c68237182015 to your computer and use it in GitHub Desktop.
Save wirelessr/f8680b319e9a98af1440c68237182015 to your computer and use it in GitHub Desktop.
Real Experiment of AVRO and Schema Registry
from copy import deepcopy
import json
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField
KAFKA_HOST = 'localhost'
schema_registry_conf = {'url': f'http://{KAFKA_HOST}:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
schema_registry_client.get_subjects()
# ['BACKWARD', 'FORWARD', 'FULL', 'BACKWARD_TRANSITIVE', 'FORWARD_TRANSITIVE', 'FULL_TRANSITIVE']
comp = 'BACKWARD'
topic = namespace = f'example.test.{comp}'
schema_registry_client.set_compatibility(level=comp)
ver0 = {
'type': 'record',
'name': 'Person',
'namespace': namespace,
'fields': [
{'name': 'name', 'type': 'string'}
]
}
ver1 = {
'type': 'record',
'name': 'Person',
'namespace': namespace,
'fields': [
{'name': 'name', 'type': 'string', 'default': '<DEPRECATED>'},
]
}
ver2 = {
'type': 'record',
'name': 'Person',
'namespace': namespace,
'fields': [
{'name': 'name', 'type': 'string', 'default': '<DEPRECATED>'},
{
'name': 'person_name',
'type': {
'name': 'Name',
'type': 'record',
'fields': [
{'name': 'first_name', 'type': 'string', 'default': '<NOT_IN_USE>'},
{'name': 'last_name', 'type': 'string', 'default': '<NOT_IN_USE>'}
]
},
'default': {'first_name': '<??>', 'last_name': '<??>'}
}
]
}
ver3 = {
'type': 'record',
'name': 'Person',
'namespace': namespace,
'fields': [
{
'name': 'person_name',
'type': {
'name': 'Name',
'type': 'record',
'fields': [
{'name': 'first_name', 'type': 'string', 'default': '<NOT_IN_USE>'},
{'name': 'last_name', 'type': 'string', 'default': '<NOT_IN_USE>'}
]
},
'default': {'first_name': '<??>', 'last_name': '<??>'}
}
]
}
ver4 = {
'type': 'record',
'name': 'Person',
'namespace': namespace,
'fields': [
{
'name': 'person_name',
'type': {
'name': 'Name',
'type': 'record',
'fields': [
{'name': 'first_name', 'type': 'string'},
{'name': 'last_name', 'type': 'string'}
]
}
}
]
}
ver5 = {
'type': 'record',
'name': 'Person',
'namespace': namespace,
'fields': [
{
'name': 'person_name',
'type': {
'name': 'Name',
'type': 'record',
'fields': [
{'name': 'first_name', 'type': 'string'},
{'name': 'last_name', 'type': 'string'}
]
},
'aliases': ['name']
}
]
}
def serialize_and_deserialize(data, schema, recv_schema=None):
avro_serializer = AvroSerializer(schema_registry_client, json.dumps(schema))
value = avro_serializer(data, SerializationContext(topic, MessageField.VALUE))
if recv_schema:
avro_deserializer = AvroDeserializer(schema_registry_client, json.dumps(recv_schema))
else:
avro_deserializer = AvroDeserializer(schema_registry_client)
return avro_deserializer(value, SerializationContext(topic, MessageField.VALUE))
user1 = {
'name': 'John Smith'
}
user2 = {
'name': 'John Smith',
'person_name': {
'first_name': 'John',
'last_name': 'Smith'
}
}
user3 = {
'person_name': {
'first_name': 'John',
'last_name': 'Smith'
}
}
user4 = {
'person_name': {
'first_name': 'John',
'last_name': 'Smith'
}
}
user5 = {
'person_name': {
'first_name': 'John',
'last_name': 'Smith'
}
}
print("Align Producer testing...")
print(serialize_and_deserialize(user1, ver0))
print(serialize_and_deserialize(user1, ver1))
print(serialize_and_deserialize(user2, ver2))
print(serialize_and_deserialize(user3, ver3))
print(serialize_and_deserialize(user4, ver4))
print(serialize_and_deserialize(user5, ver5))
print("BACKWARD testing...")
print(serialize_and_deserialize(user1, ver1, ver1))
print(serialize_and_deserialize(user2, ver2, ver1))
print(serialize_and_deserialize(user3, ver3, ver1))
print(serialize_and_deserialize(user4, ver4, ver1))
print(serialize_and_deserialize(user5, ver5, ver1))
print("Late One Ver testing...")
print(serialize_and_deserialize(user1, ver1, ver0))
print(serialize_and_deserialize(user2, ver2, ver1))
print(serialize_and_deserialize(user3, ver3, ver2))
print(serialize_and_deserialize(user4, ver4, ver3))
print(serialize_and_deserialize(user5, ver5, ver4))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment