Skip to content

Instantly share code, notes, and snippets.

@pitrou
Created October 19, 2017 10:59
Show Gist options
  • Save pitrou/245e24de52dec34e03cfc4148c001466 to your computer and use it in GitHub Desktop.
Save pitrou/245e24de52dec34e03cfc4148c001466 to your computer and use it in GitHub Desktop.
import struct
try:
from time import perf_counter as clock
except ImportError:
from time import time as clock
from tornado.ioloop import IOLoop
from tornado import gen
#import asyncio
#import uvloop
#asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
import numpy as np
from distributed.comm import connect, listen
from distributed.protocol import (to_serialize, Serialized, serialize,
deserialize)
@gen.coroutine
def server_handle_comm(comm):
msg = yield comm.read()
assert msg['op'] == 'ping'
msg['op'] = 'pong'
yield comm.write(msg)
yield comm.close()
@gen.coroutine
def run_bench(nbytes, niters):
#data = b"x" * nbytes
data = np.random.randint(0, 255, size=nbytes, dtype=np.uint8)
item = Serialized(*serialize(data))
listener = listen('tcp://127.0.0.1', server_handle_comm, deserialize=False)
listener.start()
start = clock()
for i in range(niters):
comm = yield connect(listener.contact_address, deserialize=False)
yield comm.write({'op': 'ping', 'item': item})
msg = yield comm.read()
assert msg['op'] == 'pong'
assert isinstance(msg['item'], Serialized)
yield comm.close()
print('.', end='', flush=True)
print()
end = clock()
listener.stop()
dt = end - start
rate = len(data) * niters / dt
print("duration: %s => rate: %d MB/s"
% (dt, rate / 1e6))
@gen.coroutine
def f():
yield run_bench(100 * 1000**2, 10) # 100 MB
#yield run_bench(10 * 1000**2, 100) # 10 MB
#yield run_bench(1 * 1000**2, 1000) # 1 MB
if __name__ == '__main__':
IOLoop().run_sync(f)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment