Skip to content

Instantly share code, notes, and snippets.

@ottomata
Last active February 21, 2018 22:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ottomata/f38a6514b9efc8678550 to your computer and use it in GitHub Desktop.
Save ottomata/f38a6514b9efc8678550 to your computer and use it in GitHub Desktop.
eventlogging avro code WIP
# for validation avro records
import pyavroc
# pyavroc>=0.7.1
# ---- AVRO stuff, to do more of this ---
SCHEMA_AVRO_FILE_PATH = './schemas/avro'
SCHEMA_AVRO_FILE_FORMAT = (
# TODO: Use os.path.join
SCHEMA_AVRO_FILE_PATH + '/{0}/{0}.{1}.avsc'
)
avro_schema_cache = {}
avro_serializer_cache = {}
def load_avro_schemas(schemas_path=SCHEMA_AVRO_FILE_PATH):
"""
Walks schemas_path looking for files and loads them into the
schema_cache by calling get_schema with the
scid extractd from the matched filename.
"""
# Loads all avro schemas found in path into
# the in memory schema cache.
for path, subdirs, files in os.walk(schemas_path):
for f in files:
logging.info("Loading Avro schema from %s" % f)
scid = scid_from_filename(f)
filepath = SCHEMA_AVRO_FILE_FORMAT.format(*scid)
with open(filepath) as file:
schema = file.read()
avro_schema_cache[scid] = schema
avro_serializer_cache[scid] = pyavroc.AvroSerializer(schema)
# TODO: Lookup latest schema by name without revision?
def get_avro_schema(scid):
if scid in avro_schema_cache:
return avro_schema_cache[scid]
else:
logging.warn("%s not a loaded avro schema" % scid)
return None
def avro_validate(scid, record):
schema = get_avro_schema(scid)
if schema:
return pyavroc.validate(record, schema) == 0
else:
return False
def avro_serialize(scid, record):
return avro_serializer_cache[scid].serialize(record)
# --- AVRO stuff, do more of this later --
# routes = [
# (r"/event/([\w\d\-_]+)", EventHandler), # POST /event/topic
# (r"/raw", RawHandler),
# (r"/avrojson/(\w+-\d+)", AvroJsonHandler), # POST /avrojson/SCID
# (r"/avrobinary/(\w+-\d+)", AvroBinaryHandler), # POST /avrobinary/SCID
# ]
# avro_json_writer = get_writer(kafka_base_uri + '?topic=eventlogging_aj_%(schema)s')
#
# # TODO: Figure out how to interpolate or pass topic to binary writer
# avro_binary_writer = get_writer(kafka_base_uri + '?topic=eventlogging_binary&noencode=True')
# raw_writer = get_writer(kafka_base_uri + '?topic=test')
# TODO: Encapsulate Avro records with EventCapsule too?
# TODO: Figure out if we can make use Avro Schemas that look
# exactly like EventLogging encapsulated JSON Schemas.
def process_avro_event(scid, raw_event, callback=None):
"""
Loads raw_event as a json object and validates against
the avro schema IDed by scid. This schema must be
exist at schemas/avro/.../Name.Revision.avsc.
If the event validates, this JSON record will be returned.
"""
record = json.loads(raw_event)
if not avro_validate(scid, record):
# TODO: Fill in default required fields?
error_message = 'Unable to validate: %s' % (record)
logging.error(error_message)
raise ProcessException(error_message)
else:
if callback:
callback(record)
else:
return record
def process_avro_binary_event(scid, raw_event, validate=avro_validate, callback=None):
"""
Loads raw_event as a json object and validates against
the avro schema IDed by scid. This schema must be
exist at schemas/avro/.../Name.Revision.avsc.
If the event validates, this JSON record will be serialized into
Avro binary and returned.
"""
record = json.loads(raw_event)
if not avro_validate(scid, record):
error_message = 'Unable to validate: %s' % (record)
logging.error(error_message)
raise ProcessException(error_message)
else:
binary_record = avro_serialize(scid, record)
if callback:
callback(binary_record)
else:
return binary_record
class RawHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def post(self):
"""
raw_event %j json string is read in from POST body
and then written to test topic.
"""
raw_event = self.request.body.decode('utf-8')
raw_writer.send(raw_event)
self.set_status(204, 'Raw event accepted.')
class AvroJsonHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def post(self, scid):
"""
raw_event %j AvroJSON string is read in from POST body
validated against avro test schema, and then written
as AvroJSON to Kafka.
"""
scid_parts = scid.split('-')
scid = (scid_parts[0], int(scid_parts[1]))
raw_event = self.request.body.decode('utf-8')
try:
record = yield tornado.gen.Task(process_avro_event, scid, raw_event)
except ProcessException as e:
self.set_status(550, e.message)
else:
avro_json_writer.send(record)
self.set_status(204, 'Record accepted.')
class AvroBinaryHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def post(self, scid):
"""
raw_event %j json string is read in from POST body
validated against avro test schema, and then serialized
as AvroBinary and written to Kafka.
"""
scid_parts = scid.split('-')
scid = (scid_parts[0], int(scid_parts[1]))
raw_event = self.request.body.decode('utf-8')
try:
record = yield tornado.gen.Task(process_avro_binary_event, scid, raw_event)
except ProcessException as e:
self.set_status(550, e.message)
else:
avro_binary_writer.send(record)
self.set_status(204, 'Record accepted.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment