Skip to content

Instantly share code, notes, and snippets.

@rwarren
Forked from 1st1/get_buffer_bench.py
Created March 23, 2020 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rwarren/4ea2cc4c6cb1b0153dbc796c130ac229 to your computer and use it in GitHub Desktop.
Save rwarren/4ea2cc4c6cb1b0153dbc796c130ac229 to your computer and use it in GitHub Desktop.
get_buffer_bench.py
import struct
try:
from time import perf_counter as clock
except ImportError:
from time import time as clock
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
class FrameDecoder:
"""
Framing layer mixin with custom buffering logic.
"""
def connection_made(self, transport):
self.transport = transport
self._msg_size = None
self._new_buffer(8)
def _new_buffer(self, size):
self.buffer = bytearray(size)
self.buffer_view = memoryview(self.buffer)
self.pos = 0
def get_buffer(self, sizehint):
return self.buffer_view[self.pos:]
def buffer_updated(self, nbytes):
self.pos += nbytes
if self._msg_size is None:
if self.pos == 8:
self._msg_size = struct.unpack('L', self.buffer)[0]
self._new_buffer(self._msg_size)
else:
if self.pos == self._msg_size:
msg = self.buffer
self._new_buffer(8)
self._msg_size = None
self.message_received(msg)
def message_received(self, msg):
raise NotImplementedError
def send_message(self, msg):
self.transport.write(struct.pack('L', len(msg)))
self.transport.write(msg)
class BenchServerProtocol(FrameDecoder, asyncio.BufferedProtocol):
def connection_lost(self, exc):
print('The client closed the connection:', exc)
def message_received(self, msg):
print('server', len(msg))
self.send_message(msg)
class BenchClientProtocol(FrameDecoder, asyncio.BufferedProtocol):
def __init__(self):
self._evt_done = asyncio.Event()
def connection_lost(self, exc):
print('The server closed the connection:', exc)
def message_received(self, msg):
print('client', len(msg))
self._evt_done.set()
async def wait_until_complete(self):
await self._evt_done.wait()
async def f():
data = b"x" * (100 * 1000**2) # 100 MB
niters = 5
loop = asyncio.get_event_loop()
server = await loop.create_server(BenchServerProtocol, '127.0.0.1', 8000)
start = clock()
for i in range(niters):
_, client = await loop.create_connection(BenchClientProtocol,
'127.0.0.1', 8000)
client.send_message(data)
await client.wait_until_complete()
end = clock()
server.close()
dt = end - start
rate = len(data) * niters / dt
print("duration: %s => rate: %d MB/s"
% (dt, rate / 1e6))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(f())
loop.close()
@rwarren
Copy link
Author

rwarren commented Mar 23, 2020

This just a minor edit to make it work with the (currently as of 3.8.2) provisional asyncio.BufferedProtocol API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment