Skip to content

Instantly share code, notes, and snippets.

@ContrastingSounds
Created January 15, 2018 09:14
Show Gist options
  • Save ContrastingSounds/c7048e375c88d3a85720554dbd0190a4 to your computer and use it in GitHub Desktop.
Save ContrastingSounds/c7048e375c88d3a85720554dbd0190a4 to your computer and use it in GitHub Desktop.
Stub functions to dynamically create an Avro schema, and save records to file.
import json
import avro.schema
from avro.datafile import DataFileWriter
from avro.io import DatumWriter
STATE_SCHEMA = 'state_schema.avsc'
STATE_RECORDS = 'state_records.avro'
model_dictionary = {
'dimensions': set(),
'integers': set(),
'floats': set(),
}
def create_avro_schema(model: dict, file_name: str):
"""
Given a dictionary defining an Avro schema, both saves to file (file_name.avsc) and returns schema object.
:param model: dictionary of model definition that conforms to Avro format (see http://avro.apache.org/)
:param file_name: name of file to write Avro schema
:return:
"""
state = {
'namespace': 'state.avro',
'type': 'record',
'name': 'State',
'fields': [
{'name': 'first_created', 'type': ['int', 'null']},
{'name': 'entity', 'type': 'string'},
{'name': 'uuid', 'type': 'string'},
{'name': 'parent_uuid', 'type': ['string', 'null']},
]
}
state['fields'] += [{'name': column, 'type': ['string', 'null']} for column in model['dimensions']]
state['fields'] += [{'name': column, 'type': ['int', 'null']} for column in model['integers']]
state['fields'] += [{'name': column, 'type': ['float', 'null']} for column in model['floats']]
with open(file_name, 'w') as schema_file:
json.dump(state, schema_file, indent=4)
schema = avro.schema.Parse(json.dumps(state))
return schema
def save_avro_output(model: dict, file_name: str, records: list):
"""
Given a list of records, saves to file in Avro format.
:param model: dictionary of model definition that conforms to Avro format (see http://avro.apache.org/)
:param file_name: name of file to write Avro data
:param records: list of dicts, containing the actual records to be saved
:return:
"""
schema = create_avro_schema(model)
avro_output = open(file_name, 'wb')
writer = DataFileWriter(avro_output, DatumWriter(), schema)
for record in records:
writer.append(record)
writer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment