Created
December 8, 2017 01:47
get_buffer_bench.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import struct | |
try: | |
from time import perf_counter as clock | |
except ImportError: | |
from time import time as clock | |
import asyncio | |
import uvloop | |
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) | |
class FrameDecoder: | |
""" | |
Framing layer mixin with custom buffering logic. | |
""" | |
def connection_made(self, transport): | |
self.transport = transport | |
self._msg_size = None | |
self._new_buffer(8) | |
def _new_buffer(self, size): | |
self.buffer = bytearray(size) | |
self.buffer_view = memoryview(self.buffer) | |
self.pos = 0 | |
def get_buffer(self): | |
return self.buffer_view[self.pos:] | |
def buffer_updated(self, nbytes): | |
self.pos += nbytes | |
if self._msg_size is None: | |
if self.pos == 8: | |
self._msg_size = struct.unpack('L', self.buffer)[0] | |
self._new_buffer(self._msg_size) | |
else: | |
if self.pos == self._msg_size: | |
msg = self.buffer | |
self._new_buffer(8) | |
self._msg_size = None | |
self.message_received(msg) | |
def message_received(self, msg): | |
raise NotImplementedError | |
def send_message(self, msg): | |
self.transport.write(struct.pack('L', len(msg))) | |
self.transport.write(msg) | |
class BenchServerProtocol(FrameDecoder, asyncio.Protocol): | |
def connection_lost(self, exc): | |
print('The client closed the connection:', exc) | |
def message_received(self, msg): | |
print('server', len(msg)) | |
self.send_message(msg) | |
class BenchClientProtocol(FrameDecoder, asyncio.Protocol): | |
def __init__(self): | |
self._evt_done = asyncio.Event() | |
def connection_lost(self, exc): | |
print('The server closed the connection:', exc) | |
def message_received(self, msg): | |
print('client', len(msg)) | |
self._evt_done.set() | |
async def wait_until_complete(self): | |
await self._evt_done.wait() | |
async def f(): | |
data = b"x" * (100 * 1000**2) # 100 MB | |
niters = 5 | |
loop = asyncio.get_event_loop() | |
server = await loop.create_server(BenchServerProtocol, '127.0.0.1', 8000) | |
start = clock() | |
for i in range(niters): | |
_, client = await loop.create_connection(BenchClientProtocol, | |
'127.0.0.1', 8000) | |
client.send_message(data) | |
await client.wait_until_complete() | |
end = clock() | |
server.close() | |
dt = end - start | |
rate = len(data) * niters / dt | |
print("duration: %s => rate: %d MB/s" | |
% (dt, rate / 1e6)) | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(f()) | |
loop.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment