Skip to content

Instantly share code, notes, and snippets.

@masci
Created December 1, 2016 13:26
Show Gist options
  • Save masci/7bd5c9d32f4308a227f18de62797b78a to your computer and use it in GitHub Desktop.
Save masci/7bd5c9d32f4308a227f18de62797b78a to your computer and use it in GitHub Desktop.
Encode/Decode Protobuf chained messages
import metric_pb2
from google.protobuf.internal.encoder import _VarintBytes
from google.protobuf.internal.decoder import _DecodeVarint32
import timeit
import random
from functools import partial
import io
N_RUNS = 10
def encode_pb(out_stream, values):
"""
values contains metric values, out is a file-like
object.
"""
for v in values:
my_metric = metric_pb2.Metric()
my_metric.name = 'sys.cpu'
my_metric.type = 'gauge'
my_metric.value = v
my_metric.tags.extend(['my_tag', 'foo:bar'])
size = my_metric.ByteSize()
out_stream.write(_VarintBytes(size))
out_stream.write(my_metric.SerializeToString())
def decode_pb(in_stream):
"""
Parse metrics from the input stream.
"""
buf = in_stream.read()
n = 0
my_metric = metric_pb2.Metric()
while n < len(buf):
msg_len, new_pos = _DecodeVarint32(buf, n)
n = new_pos
msg_buf = buf[n:n+msg_len]
n += msg_len
my_metric.ParseFromString(msg_buf)
# do smth with my_metric
def encode_text(out_stream, values):
"""
`values` contains metric values, out is a file-like
object.
"""
for v in values:
my_metric = 'sys.cpu,gauge,[my_tag;foo:bar],{}\n'.format(v)
out_stream.write(my_metric)
# do smth with my_metric
def decode_text(in_stream):
"""
Parse metrics from the input stream and represent each
metric with a dictionary.
"""
for m in in_stream.readlines():
my_metric = {}
name, t, tags, value = m.split(',')
my_metric['name'] = name
my_metric['tags'] = [t for t in tags.split(';')]
my_metric['value'] = float(value)
def timeit_encode(values):
# Protobuf
out = io.BytesIO()
t = timeit.Timer(partial(encode_pb, out, values))
print('Protobuf: {:6.3f}ms'.format(t.timeit(N_RUNS) / N_RUNS * 1000))
# Text
out = io.StringIO()
t = timeit.Timer(partial(encode_text, out, values))
print('Text: {:6.3f}ms'.format(t.timeit(N_RUNS) / N_RUNS * 1000))
def timeit_decode(bin_stream, text_stream):
t = timeit.Timer(partial(decode_pb, bin_stream))
print('Protobuf: {:6.3f}ms'.format(t.timeit(N_RUNS) / N_RUNS * 1000))
t = timeit.Timer(partial(decode_text, text_stream))
print('Text: {:6.3f}ms'.format(t.timeit(N_RUNS) / N_RUNS * 1000))
t = timeit.Timer(lambda: None)
print('Noop: {:6.6f}ms'.format(t.timeit(N_RUNS) / N_RUNS * 1000))
if __name__ == '__main__':
for batch in (10**3, 10**4, 10**5, 10**6):
values = [round(random.random(), 2) for _ in range(batch)]
print('Encoding {} metrics'.format(batch))
timeit_encode(values)
print('Decoding {} metrics'.format(batch))
with open('out.bin', 'wb') as f:
encode_pb(f, values)
with open('out.txt', 'w') as f:
encode_text(f, values)
with open('out.bin', 'rb') as fb:
bin_stream = io.BytesIO(fb.read())
with open('out.txt') as ft:
str_stream = io.StringIO(ft.read())
timeit_decode(bin_stream, ft)
print("=" * 80)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment