get_buffer_bench.py
import struct | |
try: | |
from time import perf_counter as clock | |
except ImportError: | |
from time import time as clock | |
import asyncio | |
import uvloop | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
class FrameDecoder: | |
""" | |
Framing layer mixin with custom buffering logic. | |
""" | |
def connection_made(self, transport): | |
self.transport = transport | |
self._msg_size = None | |
self._new_buffer(8) | |
def _new_buffer(self, size): | |
self.buffer = bytearray(size) | |
self.buffer_view = memoryview(self.buffer) | |
self.pos = 0 | |
def get_buffer(self): | |
return self.buffer_view[self.pos:] | |
def buffer_updated(self, nbytes): | |
self.pos += nbytes | |
if self._msg_size is None: | |
if self.pos == 8: | |
self._msg_size = struct.unpack('L', self.buffer)[0] | |
self._new_buffer(self._msg_size) | |
else: | |
if self.pos == self._msg_size: | |
msg = self.buffer | |
self._new_buffer(8) | |
self._msg_size = None | |
self.message_received(msg) | |
def message_received(self, msg): | |
raise NotImplementedError | |
def send_message(self, msg): | |
self.transport.write(struct.pack('L', len(msg))) | |
self.transport.write(msg) | |
class BenchServerProtocol(FrameDecoder, asyncio.Protocol): | |
def connection_lost(self, exc): | |
print('The client closed the connection:', exc) | |
def message_received(self, msg): | |
print('server', len(msg)) | |
self.send_message(msg) | |
class BenchClientProtocol(FrameDecoder, asyncio.Protocol): | |
def __init__(self): | |
self._evt_done = asyncio.Event() | |
def connection_lost(self, exc): | |
print('The server closed the connection:', exc) | |
def message_received(self, msg): | |
print('client', len(msg)) | |
self._evt_done.set() | |
async def wait_until_complete(self): | |
await self._evt_done.wait() | |
async def f(): | |
data = b"x" * (100 * 1000**2) # 100 MB | |
niters = 5 | |
loop = asyncio.get_event_loop() | |
server = await loop.create_server(BenchServerProtocol, '127.0.0.1', 8000) | |
start = clock() | |
for i in range(niters): | |
_, client = await loop.create_connection(BenchClientProtocol, | |
'127.0.0.1', 8000) | |
client.send_message(data) | |
await client.wait_until_complete() | |
end = clock() | |
server.close() | |
dt = end - start | |
rate = len(data) * niters / dt | |
print("duration: %s => rate: %d MB/s" | |
% (dt, rate / 1e6)) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(f()) | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment