Last active
February 21, 2018 22:26
-
-
Save ottomata/f38a6514b9efc8678550 to your computer and use it in GitHub Desktop.
eventlogging avro code WIP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# for validation avro records | |
import pyavroc | |
# pyavroc>=0.7.1 | |
# ---- AVRO stuff, to do more of this --- | |
SCHEMA_AVRO_FILE_PATH = './schemas/avro' | |
SCHEMA_AVRO_FILE_FORMAT = ( | |
# TODO: Use os.path.join | |
SCHEMA_AVRO_FILE_PATH + '/{0}/{0}.{1}.avsc' | |
) | |
avro_schema_cache = {} | |
avro_serializer_cache = {} | |
def load_avro_schemas(schemas_path=SCHEMA_AVRO_FILE_PATH): | |
""" | |
Walks schemas_path looking for files and loads them into the | |
schema_cache by calling get_schema with the | |
scid extractd from the matched filename. | |
""" | |
# Loads all avro schemas found in path into | |
# the in memory schema cache. | |
for path, subdirs, files in os.walk(schemas_path): | |
for f in files: | |
logging.info("Loading Avro schema from %s" % f) | |
scid = scid_from_filename(f) | |
filepath = SCHEMA_AVRO_FILE_FORMAT.format(*scid) | |
with open(filepath) as file: | |
schema = file.read() | |
avro_schema_cache[scid] = schema | |
avro_serializer_cache[scid] = pyavroc.AvroSerializer(schema) | |
# TODO: Lookup latest schema by name without revision? | |
def get_avro_schema(scid): | |
if scid in avro_schema_cache: | |
return avro_schema_cache[scid] | |
else: | |
logging.warn("%s not a loaded avro schema" % scid) | |
return None | |
def avro_validate(scid, record): | |
schema = get_avro_schema(scid) | |
if schema: | |
return pyavroc.validate(record, schema) == 0 | |
else: | |
return False | |
def avro_serialize(scid, record): | |
return avro_serializer_cache[scid].serialize(record) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# --- AVRO stuff, do more of this later -- | |
# routes = [ | |
# (r"/event/([\w\d\-_]+)", EventHandler), # POST /event/topic | |
# (r"/raw", RawHandler), | |
# (r"/avrojson/(\w+-\d+)", AvroJsonHandler), # POST /avrojson/SCID | |
# (r"/avrobinary/(\w+-\d+)", AvroBinaryHandler), # POST /avrobinary/SCID | |
# ] | |
# avro_json_writer = get_writer(kafka_base_uri + '?topic=eventlogging_aj_%(schema)s') | |
# | |
# # TODO: Figure out how to interpolate or pass topic to binary writer | |
# avro_binary_writer = get_writer(kafka_base_uri + '?topic=eventlogging_binary&noencode=True') | |
# raw_writer = get_writer(kafka_base_uri + '?topic=test') | |
# TODO: Encapsulate Avro records with EventCapsule too? | |
# TODO: Figure out if we can make use Avro Schemas that look | |
# exactly like EventLogging encapsulated JSON Schemas. | |
def process_avro_event(scid, raw_event, callback=None): | |
""" | |
Loads raw_event as a json object and validates against | |
the avro schema IDed by scid. This schema must be | |
exist at schemas/avro/.../Name.Revision.avsc. | |
If the event validates, this JSON record will be returned. | |
""" | |
record = json.loads(raw_event) | |
if not avro_validate(scid, record): | |
# TODO: Fill in default required fields? | |
error_message = 'Unable to validate: %s' % (record) | |
logging.error(error_message) | |
raise ProcessException(error_message) | |
else: | |
if callback: | |
callback(record) | |
else: | |
return record | |
def process_avro_binary_event(scid, raw_event, validate=avro_validate, callback=None): | |
""" | |
Loads raw_event as a json object and validates against | |
the avro schema IDed by scid. This schema must be | |
exist at schemas/avro/.../Name.Revision.avsc. | |
If the event validates, this JSON record will be serialized into | |
Avro binary and returned. | |
""" | |
record = json.loads(raw_event) | |
if not avro_validate(scid, record): | |
error_message = 'Unable to validate: %s' % (record) | |
logging.error(error_message) | |
raise ProcessException(error_message) | |
else: | |
binary_record = avro_serialize(scid, record) | |
if callback: | |
callback(binary_record) | |
else: | |
return binary_record | |
class RawHandler(tornado.web.RequestHandler): | |
@tornado.gen.coroutine | |
def post(self): | |
""" | |
raw_event %j json string is read in from POST body | |
and then written to test topic. | |
""" | |
raw_event = self.request.body.decode('utf-8') | |
raw_writer.send(raw_event) | |
self.set_status(204, 'Raw event accepted.') | |
class AvroJsonHandler(tornado.web.RequestHandler): | |
@tornado.gen.coroutine | |
def post(self, scid): | |
""" | |
raw_event %j AvroJSON string is read in from POST body | |
validated against avro test schema, and then written | |
as AvroJSON to Kafka. | |
""" | |
scid_parts = scid.split('-') | |
scid = (scid_parts[0], int(scid_parts[1])) | |
raw_event = self.request.body.decode('utf-8') | |
try: | |
record = yield tornado.gen.Task(process_avro_event, scid, raw_event) | |
except ProcessException as e: | |
self.set_status(550, e.message) | |
else: | |
avro_json_writer.send(record) | |
self.set_status(204, 'Record accepted.') | |
class AvroBinaryHandler(tornado.web.RequestHandler): | |
@tornado.gen.coroutine | |
def post(self, scid): | |
""" | |
raw_event %j json string is read in from POST body | |
validated against avro test schema, and then serialized | |
as AvroBinary and written to Kafka. | |
""" | |
scid_parts = scid.split('-') | |
scid = (scid_parts[0], int(scid_parts[1])) | |
raw_event = self.request.body.decode('utf-8') | |
try: | |
record = yield tornado.gen.Task(process_avro_binary_event, scid, raw_event) | |
except ProcessException as e: | |
self.set_status(550, e.message) | |
else: | |
avro_binary_writer.send(record) | |
self.set_status(204, 'Record accepted.') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment