Skip to content

Instantly share code, notes, and snippets.

@chck
Created July 23, 2017 16:11
Show Gist options
  • Save chck/707107a403ca665986ed0f22e6539f1e to your computer and use it in GitHub Desktop.
Save chck/707107a403ca665986ed0f22e6539f1e to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from __future__ import absolute_import, print_function
import json
from ast import literal_eval
import numpy as np
import yaml
from avro.io import DatumReader, DatumWriter, BinaryEncoder, BinaryDecoder
from avro.schema import parse
"""Reference:
https://gist.github.com/blink1073/796e4eb01d43ebcec62a
"""
class BinaryDatumWriter:
def __init__(self, schema, buf):
if isinstance(schema, (dict, str)):
schema = load_schema(schema)
self.schema = schema
self._writer = DatumWriter(schema)
self._encoder = BinaryEncoder(buf)
self.buf = buf
def write(self, datum):
for (k, v) in datum.items():
if isinstance(v, np.ndarray):
field = self.schema.fields_dict[k]
if field.type.props['type'] in ['fixed', 'binary']:
datum[k] = v.tobytes()
else:
datum[k] = v.tolist()
self._writer.write(datum, self._encoder)
class BinaryDatumReader:
def __init__(self, writer_schema, buf, reader_schema=None):
if isinstance(writer_schema, (dict, str)):
writer_schema = load_schema(writer_schema)
if isinstance(reader_schema, (dict, str)):
reader_schema = load_schema(reader_schema)
self.schema = reader_schema or writer_schema
self._reader = DatumReader(writer_schema, reader_schema)
self._decoder = BinaryDecoder(buf)
self.buf = buf
def read(self):
return self._reader.read(self._decoder)
def unpack_datum(schema, datum):
if isinstance(schema, (dict, str)):
schema = load_schema(schema)
for field in schema.fields:
props = field.type.props
if props.get('logicalType', None) == 'ndarray':
if not props['type'] in ['fixed', 'binary']:
continue
dtype = np.dtype(props['dtype'])
name = field.name
datum[name] = np.frombuffer(datum[name], dtype=dtype)
if 'shape' in props:
try:
shape = literal_eval(props['shape'])
except SyntaxError:
msg = 'Could not parse shape: "%s"' % props['shape']
raise SyntaxError(msg)
return datum
def load_schema(schema):
if not isinstance(schema, dict):
schema = yaml.load(schema)
return parse(json.dumps(schema))
if __name__ == '__main__':
from io import BytesIO
from time import time
schema = {"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": ["int", "null"]},
{"name": "favorite_color", "type": ["string", "null"]},
{"name": "yo", "type": {"type": "array", "items": ["null", "string"]}},
{"name": "fixed_big",
"type": {"type": "fixed", "name": "image", "size": 1920 * 1024, "logicalType": "ndarray",
"dtype": "uint8", "shape": "(1920,1024)"}},
]}
buf = BytesIO()
bdata = np.ones((1920, 1024), dtype=np.uint8)
writer = BinaryDatumWriter(schema, buf)
writer.write({
"name": "Alyssa",
"favorite_number": 256,
"yo": [None, "hey"],
"fixed_big": bdata,
})
t0 = time()
for _ in range(1000):
writer.write({
"name": "Ben",
"favorite_number": 7,
"yo": [None],
"favorite_color": "red",
"fixed_big": bdata,
})
print('Write to buf:', time() - t0)
with open('test.dat', 'wb') as f:
f.write(buf.getvalue())
print('Write to disk:', time() - t0)
buf.seek(0)
reader = BinaryDatumReader(schema, buf)
d1 = reader.read()
for _ in range(1000):
reader.read()
print('Read from buf:', time() - t0)
print(unpack_datum(schema, d1))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment