Skip to content

Instantly share code, notes, and snippets.

@tkaemming
Last active April 22, 2016 18:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tkaemming/358121b873248adedad559e44824c3b5 to your computer and use it in GitHub Desktop.
Save tkaemming/358121b873248adedad559e44824c3b5 to your computer and use it in GitHub Desktop.
*.pyc
schemata/
schemata:
avro-tools idl2schemata schema.avdl schemata
avro
click
msgpack-python
fastavro
ujson
protocol Schema {
record Breadcrumb {
float timestamp;
string type;
map<union {string, int}> data;
}
}
import click
import functools
import json
import msgpack
import os
import time
import ujson
from avro.io import (
BinaryEncoder,
DatumWriter,
)
from avro.schema import parse
from fastavro.writer import write_record as fastavro_write_record
from cStringIO import StringIO
breadcrumb = {
"timestamp": 1461274494.77,
"type": "http_request",
"data": {
"url": "/api/0/internal/health/",
"status_code": 403,
"method": "GET",
}
}
avro_schema = file('schemata/Breadcrumb.avsc').read()
def avro_writer(f):
return functools.partial(
DatumWriter(parse(avro_schema)).write,
encoder=BinaryEncoder(f),
)
def fastavro_writer(f):
schema = json.loads(avro_schema)
def write(value):
fastavro_write_record(f, value, schema)
return write
def json_writer(f):
return functools.partial(
json.dump,
fp=f,
)
def ujson_writer(f):
def write(value):
return ujson.dump(value, f)
return write
def msgpack_writer(f):
def writer(value):
f.write(msgpack.packb(value))
return writer
encoders = {
'avro': avro_writer,
'fastavro': fastavro_writer,
'json': json_writer,
'msgpack': msgpack_writer,
'ujson': ujson_writer,
}
@click.command()
@click.option('-i', '--iterations', type=click.INT, default=1e5)
@click.option('-e', '--encoder', type=click.Choice(encoders.keys()))
def run(encoder, iterations):
stdout = click.get_text_stream('stdout')
def test(name):
f = StringIO()
writer = encoders[name](f)
stdout.write("using encoder: {}\n".format(name))
start = time.time()
for i in xrange(iterations):
writer(breadcrumb)
f.flush()
end = time.time()
f.seek(0, os.SEEK_END)
size = f.tell()
stdout.write(
"{:.2f} msecs, {:.4f} msecs/item\n"
"{} bytes, {} bytes/item\n".format(
(end - start) * 1000,
((end - start) * 1000) / iterations,
size,
size / iterations,
)
)
if encoder is not None:
test(encoder)
else:
for encoder in encoders.keys():
test(encoder)
if __name__ == '__main__':
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment