Skip to content

Instantly share code, notes, and snippets.

@Ugbot
Last active May 18, 2023 19:09
Show Gist options
  • Save Ugbot/b8fd2841047eac7e5c0908281d5857a3 to your computer and use it in GitHub Desktop.
Save Ugbot/b8fd2841047eac7e5c0908281d5857a3 to your computer and use it in GitHub Desktop.
from typing import List, Dict, Union
from avro import schema, datafile, io
def avro_schema_from_type(typ):
if typ == int:
return schema.SchemaFromJSONData({'type': 'int'})
elif typ == float:
return schema.SchemaFromJSONData({'type': 'float'})
elif typ == bool:
return schema.SchemaFromJSONData({'type': 'boolean'})
elif typ == str:
return schema.SchemaFromJSONData({'type': 'string'})
elif typ == bytes:
return schema.SchemaFromJSONData({'type': 'bytes'})
elif isinstance(typ, List):
return schema.SchemaFromJSONData({'type': 'array', 'items': avro_schema_from_type(typ[0])})
elif isinstance(typ, Dict):
return schema.SchemaFromJSONData({'type': 'map', 'values': avro_schema_from_type(typ.values())})
else:
raise ValueError(f'Unsupported type: {typ}')
def avro_record(typ):
schema_dict = {
'name': typ.__name__,
'type': 'record',
'fields': []
}
for attr_name, attr_type in typ.__annotations__.items():
avro_field = {
'name': attr_name,
'type': avro_schema_from_type(attr_type)
}
schema_dict['fields'].append(avro_field)
return schema.SchemaFromJSONData(schema_dict)
def avro_schema(cls):
schema = avro_record(cls)
def wrapper(func):
def inner(self, *args, **kwargs):
bytes_writer = io.BytesIO()
datum_writer = io.DatumWriter(schema)
data_writer = datafile.DataFileWriter(bytes_writer, datum_writer, schema)
data_writer.append(self)
data_writer.flush()
return bytes_writer.getvalue()
return inner
cls.serialize = wrapper(cls.serialize)
cls.schema = schema
return cls
@avro_schema
class User:
name: str
age: int
email: str
address: Dict[str, Union[str, int]]
def __init__(self, name: str, age: int, email: str, address: Dict[str, Union[str, int]]):
self.name = name
self.age = age
self.email = email
self.address = address
def serialize(self):
return {'name': self.name, 'age': self.age, 'email': self.email, 'address': self.address}
user_data = {'name': 'Alice', 'age': 30, 'email': 'alice@example.com', 'address': {'street': '123 Main St', 'zip': 12345}}
user = User(**user_data)
print(user.schema)
print(user.serialize())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment