Created
July 23, 2017 16:11
-
-
Save chck/707107a403ca665986ed0f22e6539f1e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
from __future__ import absolute_import, print_function | |
import json | |
from ast import literal_eval | |
import numpy as np | |
import yaml | |
from avro.io import DatumReader, DatumWriter, BinaryEncoder, BinaryDecoder | |
from avro.schema import parse | |
"""Reference: | |
https://gist.github.com/blink1073/796e4eb01d43ebcec62a | |
""" | |
class BinaryDatumWriter: | |
def __init__(self, schema, buf): | |
if isinstance(schema, (dict, str)): | |
schema = load_schema(schema) | |
self.schema = schema | |
self._writer = DatumWriter(schema) | |
self._encoder = BinaryEncoder(buf) | |
self.buf = buf | |
def write(self, datum): | |
for (k, v) in datum.items(): | |
if isinstance(v, np.ndarray): | |
field = self.schema.fields_dict[k] | |
if field.type.props['type'] in ['fixed', 'binary']: | |
datum[k] = v.tobytes() | |
else: | |
datum[k] = v.tolist() | |
self._writer.write(datum, self._encoder) | |
class BinaryDatumReader: | |
def __init__(self, writer_schema, buf, reader_schema=None): | |
if isinstance(writer_schema, (dict, str)): | |
writer_schema = load_schema(writer_schema) | |
if isinstance(reader_schema, (dict, str)): | |
reader_schema = load_schema(reader_schema) | |
self.schema = reader_schema or writer_schema | |
self._reader = DatumReader(writer_schema, reader_schema) | |
self._decoder = BinaryDecoder(buf) | |
self.buf = buf | |
def read(self): | |
return self._reader.read(self._decoder) | |
def unpack_datum(schema, datum): | |
if isinstance(schema, (dict, str)): | |
schema = load_schema(schema) | |
for field in schema.fields: | |
props = field.type.props | |
if props.get('logicalType', None) == 'ndarray': | |
if not props['type'] in ['fixed', 'binary']: | |
continue | |
dtype = np.dtype(props['dtype']) | |
name = field.name | |
datum[name] = np.frombuffer(datum[name], dtype=dtype) | |
if 'shape' in props: | |
try: | |
shape = literal_eval(props['shape']) | |
except SyntaxError: | |
msg = 'Could not parse shape: "%s"' % props['shape'] | |
raise SyntaxError(msg) | |
return datum | |
def load_schema(schema): | |
if not isinstance(schema, dict): | |
schema = yaml.load(schema) | |
return parse(json.dumps(schema)) | |
if __name__ == '__main__': | |
from io import BytesIO | |
from time import time | |
schema = {"namespace": "example.avro", | |
"type": "record", | |
"name": "User", | |
"fields": [ | |
{"name": "name", "type": "string"}, | |
{"name": "favorite_number", "type": ["int", "null"]}, | |
{"name": "favorite_color", "type": ["string", "null"]}, | |
{"name": "yo", "type": {"type": "array", "items": ["null", "string"]}}, | |
{"name": "fixed_big", | |
"type": {"type": "fixed", "name": "image", "size": 1920 * 1024, "logicalType": "ndarray", | |
"dtype": "uint8", "shape": "(1920,1024)"}}, | |
]} | |
buf = BytesIO() | |
bdata = np.ones((1920, 1024), dtype=np.uint8) | |
writer = BinaryDatumWriter(schema, buf) | |
writer.write({ | |
"name": "Alyssa", | |
"favorite_number": 256, | |
"yo": [None, "hey"], | |
"fixed_big": bdata, | |
}) | |
t0 = time() | |
for _ in range(1000): | |
writer.write({ | |
"name": "Ben", | |
"favorite_number": 7, | |
"yo": [None], | |
"favorite_color": "red", | |
"fixed_big": bdata, | |
}) | |
print('Write to buf:', time() - t0) | |
with open('test.dat', 'wb') as f: | |
f.write(buf.getvalue()) | |
print('Write to disk:', time() - t0) | |
buf.seek(0) | |
reader = BinaryDatumReader(schema, buf) | |
d1 = reader.read() | |
for _ in range(1000): | |
reader.read() | |
print('Read from buf:', time() - t0) | |
print(unpack_datum(schema, d1)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment