Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
This is an example avro codegen'd python object representing a specific avro schema
# Autogenerated by Avro
from import validate
from avro.schema import parse
class Event(dict):
_schema = parse('''{"type":"record","name":"Event","namespace":"org.logging.api","fields":[{"name":"event_type","type":"string"},{"name":"random_string","type":["string","null"],"default":null},{"name":"event_id","type":["long","null"],"default":null},{"name":"user_id","type":["long","null"],"default":null}]}''')
def typecheck(cls, attr, value):
Given an attribute and value, returns True iff this class's schema
includes that attribute and the value is of the attribute's specified type.
field = cls._schema.fields_dict.get(attr)
return field and validate(field.type, value)
def is_valid(cls, attributes):
return all(cls.typecheck(k, v) for k, v in attributes.items())
def event_type(self):
return self['event_type']
def event_type(self, value):
if Event.typecheck('event_type', value):
self['event_type'] = value
raise TypeError("cannot set event_type to " + repr(type(value)) + ", expecting: str")
def random_string(self):
return self['random_string']
def random_string(self, value):
if Event.typecheck('random_string', value):
self['random-string'] = value
raise TypeError("cannot set random_string to " + repr(type(value)) + ", expecting: str")
def event_id(self):
return self['event_id']
def event_id(self, value):
if Event.typecheck('event_id', value):
self['event_id'] = value
raise TypeError("cannot set event_id to " + repr(type(value)) + ", expecting: long")
def user_id(self):
return self['user_id']
def user_id(self, value):
if Event.typecheck('user_id', value):
self['user_id'] = value
raise TypeError("cannot set user_id to " + repr(type(value)) + ", expecting: long")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment