import metric_pb2 | |
from google.protobuf.internal.encoder import _VarintBytes | |
from google.protobuf.internal.decoder import _DecodeVarint32 | |
import timeit | |
import random | |
from functools import partial | |
import io | |
N_RUNS = 10 | |
def encode_pb(out_stream, values): | |
""" | |
values contains metric values, out is a file-like | |
object. | |
""" | |
for v in values: | |
my_metric = metric_pb2.Metric() | |
my_metric.name = 'sys.cpu' | |
my_metric.type = 'gauge' | |
my_metric.value = v | |
my_metric.tags.extend(['my_tag', 'foo:bar']) | |
size = my_metric.ByteSize() | |
out_stream.write(_VarintBytes(size)) | |
out_stream.write(my_metric.SerializeToString()) | |
def decode_pb(in_stream): | |
""" | |
Parse metrics from the input stream. | |
""" | |
buf = in_stream.read() | |
n = 0 | |
my_metric = metric_pb2.Metric() | |
while n < len(buf): | |
msg_len, new_pos = _DecodeVarint32(buf, n) | |
n = new_pos | |
msg_buf = buf[n:n+msg_len] | |
n += msg_len | |
my_metric.ParseFromString(msg_buf) | |
# do smth with my_metric | |
def encode_text(out_stream, values): | |
""" | |
`values` contains metric values, out is a file-like | |
object. | |
""" | |
for v in values: | |
my_metric = 'sys.cpu,gauge,[my_tag;foo:bar],{}\n'.format(v) | |
out_stream.write(my_metric) | |
# do smth with my_metric | |
def decode_text(in_stream): | |
""" | |
Parse metrics from the input stream and represent each | |
metric with a dictionary. | |
""" | |
for m in in_stream.readlines(): | |
my_metric = {} | |
name, t, tags, value = m.split(',') | |
my_metric['name'] = name | |
my_metric['tags'] = [t for t in tags.split(';')] | |
my_metric['value'] = float(value) | |
def timeit_encode(values): | |
# Protobuf | |
out = io.BytesIO() | |
t = timeit.Timer(partial(encode_pb, out, values)) | |
print('Protobuf: {:6.3f}ms'.format(t.timeit(N_RUNS) / N_RUNS * 1000)) | |
# Text | |
out = io.StringIO() | |
t = timeit.Timer(partial(encode_text, out, values)) | |
print('Text: {:6.3f}ms'.format(t.timeit(N_RUNS) / N_RUNS * 1000)) | |
def timeit_decode(bin_stream, text_stream): | |
t = timeit.Timer(partial(decode_pb, bin_stream)) | |
print('Protobuf: {:6.3f}ms'.format(t.timeit(N_RUNS) / N_RUNS * 1000)) | |
t = timeit.Timer(partial(decode_text, text_stream)) | |
print('Text: {:6.3f}ms'.format(t.timeit(N_RUNS) / N_RUNS * 1000)) | |
t = timeit.Timer(lambda: None) | |
print('Noop: {:6.6f}ms'.format(t.timeit(N_RUNS) / N_RUNS * 1000)) | |
if __name__ == '__main__': | |
for batch in (10**3, 10**4, 10**5, 10**6): | |
values = [round(random.random(), 2) for _ in range(batch)] | |
print('Encoding {} metrics'.format(batch)) | |
timeit_encode(values) | |
print('Decoding {} metrics'.format(batch)) | |
with open('out.bin', 'wb') as f: | |
encode_pb(f, values) | |
with open('out.txt', 'w') as f: | |
encode_text(f, values) | |
with open('out.bin', 'rb') as fb: | |
bin_stream = io.BytesIO(fb.read()) | |
with open('out.txt') as ft: | |
str_stream = io.StringIO(ft.read()) | |
timeit_decode(bin_stream, ft) | |
print("=" * 80) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment