Instantly share code, notes, and snippets.

Embed
What would you like to do?
get_buffer_bench.py
import struct
try:
from time import perf_counter as clock
except ImportError:
from time import time as clock
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
class FrameDecoder:
"""
Framing layer mixin with custom buffering logic.
"""
def connection_made(self, transport):
self.transport = transport
self._msg_size = None
self._new_buffer(8)
def _new_buffer(self, size):
self.buffer = bytearray(size)
self.buffer_view = memoryview(self.buffer)
self.pos = 0
def get_buffer(self):
return self.buffer_view[self.pos:]
def buffer_updated(self, nbytes):
self.pos += nbytes
if self._msg_size is None:
if self.pos == 8:
self._msg_size = struct.unpack('L', self.buffer)[0]
self._new_buffer(self._msg_size)
else:
if self.pos == self._msg_size:
msg = self.buffer
self._new_buffer(8)
self._msg_size = None
self.message_received(msg)
def message_received(self, msg):
raise NotImplementedError
def send_message(self, msg):
self.transport.write(struct.pack('L', len(msg)))
self.transport.write(msg)
class BenchServerProtocol(FrameDecoder, asyncio.Protocol):
def connection_lost(self, exc):
print('The client closed the connection:', exc)
def message_received(self, msg):
print('server', len(msg))
self.send_message(msg)
class BenchClientProtocol(FrameDecoder, asyncio.Protocol):
def __init__(self):
self._evt_done = asyncio.Event()
def connection_lost(self, exc):
print('The server closed the connection:', exc)
def message_received(self, msg):
print('client', len(msg))
self._evt_done.set()
async def wait_until_complete(self):
await self._evt_done.wait()
async def f():
data = b"x" * (100 * 1000**2) # 100 MB
niters = 5
loop = asyncio.get_event_loop()
server = await loop.create_server(BenchServerProtocol, '127.0.0.1', 8000)
start = clock()
for i in range(niters):
_, client = await loop.create_connection(BenchClientProtocol,
'127.0.0.1', 8000)
client.send_message(data)
await client.wait_until_complete()
end = clock()
server.close()
dt = end - start
rate = len(data) * niters / dt
print("duration: %s => rate: %d MB/s"
% (dt, rate / 1e6))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(f())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment