Skip to content

Instantly share code, notes, and snippets.

@dharamsk
Created April 14, 2018 00:33
Show Gist options
  • Save dharamsk/473d87f8f1cf1fe1af87296db8def2a9 to your computer and use it in GitHub Desktop.
Save dharamsk/473d87f8f1cf1fe1af87296db8def2a9 to your computer and use it in GitHub Desktop.
Avro Debugger Script
import random
import argparse
import mock
from where_ever.stream_handler import *
from where_ever.tests.test_stream_handler import *
"""
This will check an avro schema against an example data record
It will tell you two things:
1) Which, if any, schema fields contain avro syntax errors
2) A list of fields in the data record that don't have field definitions in the schema
To use this:
- Change the imports above to import * from your stream_handler and any other necessary files
- Change the 3 Config Variables below
"""
# Config Variables
FIXTURE_OBJECT = EXAMPLE_RECORD # The name of your example data record imported by the test_stream_handler
avro_for_object = avro_for_record # The name of your stream_handler function that will perform any conversions on the output of the data record
AVRO_OBJECT_NAME = 'Record' # The name of your object in the avro protocol setup, used to get the correct schema from the protocol
# Setup
records = [dynamo_item_to_python_dict_converter(
record[u'dynamodb'][u'NewImage']) for record in [FIXTURE_OBJECT]]
batch = [avro_for_object(r) for r in records]
proto = avro.protocol.parse(json.dumps(YOUR_SPESHUL_PROTOCOL))
schema = proto.types_dict[AVRO_OBJECT_NAME]
# Functions
def _remove_a_field(schema):
i = random.randint(0,len(schema.fields)-1)
field = schema.fields[i]
del schema.fields[i]
return schema, field
def _replace_a_field(schema, field):
schema.fields.append(field)
return schema
def debug_avro_fields(schema, batch):
z_schema = deepcopy(schema)
z_batch = deepcopy(batch)
encoded = None
switch = True
removed = []
failing = []
while switch:
# try to encode
try:
encoded = your_speshul_serializer.serialize(z_schema, z_batch)
# except remove a field
except:
z_schema, r = _remove_a_field(z_schema)
removed.append(r)
# if encoding succeeded
if encoded:
while len(removed) > 0:
# replace a field (pop from list)
r = removed.pop()
z_schema = _replace_a_field(z_schema, r)
# try to encode
try:
encoded = your_speshul_serializer.serialize(z_schema, z_batch)
# except remove the field, label it failing
except:
failing.append(z_schema.fields.pop())
# turn switch off
switch = False
return failing
def missing_fields(schema, batch):
f_names = [f.name for f in schema.fields]
out = []
for k in batch[0].keys():
if k not in f_names:
out.append(k)
return out
# Run it
if __name__ == "__main__":
fails = debug_avro_fields(schema, batch)
if len(fails) > 0:
print "Found avro syntax errors: "
else:
print "Syntax looks good!"
for f in fails:
print f.name
missing = missing_fields(schema, batch)
if len(missing) > 0:
print "Found fields missing from schema: "
else:
print "All fields accounted for!"
for m in missing:
print m
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment