public
Created

This is an example avro codegen'd python object representing a specific avro schema

  • Download Gist
event_codegen.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
#
# Autogenerated by Avro
#
# DO NOT EDIT DIRECTLY
#
from avro.io import validate
from avro.schema import parse
 
 
class Event(dict):
_schema = parse('''{"type":"record","name":"Event","namespace":"org.logging.api","fields":[{"name":"event_type","type":"string"},{"name":"random_string","type":["string","null"],"default":null},{"name":"event_id","type":["long","null"],"default":null},{"name":"user_id","type":["long","null"],"default":null}]}''')
 
@classmethod
def typecheck(cls, attr, value):
'''
Given an attribute and value, returns True iff this class's schema
includes that attribute and the value is of the attribute's specified type.
'''
field = cls._schema.fields_dict.get(attr)
return field and validate(field.type, value)
 
@classmethod
def is_valid(cls, attributes):
return all(cls.typecheck(k, v) for k, v in attributes.items())
 
@property
def event_type(self):
return self['event_type']
 
@event_type.setter
def event_type(self, value):
if Event.typecheck('event_type', value):
self['event_type'] = value
else:
raise TypeError("cannot set event_type to " + repr(type(value)) + ", expecting: str")
 
@property
def random_string(self):
return self['random_string']
 
@random_string.setter
def random_string(self, value):
if Event.typecheck('random_string', value):
self['random-string'] = value
else:
raise TypeError("cannot set random_string to " + repr(type(value)) + ", expecting: str")
 
@property
def event_id(self):
return self['event_id']
 
@event_id.setter
def event_id(self, value):
if Event.typecheck('event_id', value):
self['event_id'] = value
else:
raise TypeError("cannot set event_id to " + repr(type(value)) + ", expecting: long")
 
@property
def user_id(self):
return self['user_id']
 
@user_id.setter
def user_id(self, value):
if Event.typecheck('user_id', value):
self['user_id'] = value
else:
raise TypeError("cannot set user_id to " + repr(type(value)) + ", expecting: long")

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.