Skip to content

Instantly share code, notes, and snippets.

@smurfix
Last active September 7, 2021 08:33
Show Gist options
  • Save smurfix/204fcc459149faa1b29e51daf22ce5d1 to your computer and use it in GitHub Desktop.
Save smurfix/204fcc459149faa1b29e51daf22ce5d1 to your computer and use it in GitHub Desktop.
example code for a multiplexing client/server protocol
#!/usr/bin/python3
"""
This is example code for a multiplexing client/server protocol.
Missing:
* server capacity management
* actual testcases for error propagation and cancellation
* handle badly-formatted messages without crashing the server
* sending more than one request or reply per interaction
* most likely a few other edge cases
Message format:
Messages are wrapped with `msgpack` because it's self-delimiting and very
easy to work with. Messages are dicts with these fields:
* id: sequence number from the client, returned by the server
* msg: the actual data of the request / reply
* err: server-side error, forwarded to the client
* kill: sequence number of a to-be-cancelled request, sent to the server
"""
import trio
try:
from contextlib import asynccontextmanager
except ImportError:
from async_generator import asynccontextmanager
import msgpack
from functools import partial
from random import random,randint
from outcome import Error,Value
import logging
logger = logging.getLogger(__name__)
PORT=50123
class RemoteError(RuntimeError):
pass
########### Server ###############
class ServerHandler:
"""
This async function reads messages from ``sock`` and runs the async
function ``reply`` with each message from the client, in parallel.
The results are returned to the client.
TODO: no capacity management yet.
TODO: this should probably be an actual class
"""
def __init__(self, sock):
self.lock = trio.Lock() # for serializing sending replies
self.jobs = dict() # for cancelling jobs
self.sock = sock
async def process(self, msg):
"""
The main method of this class: process one incoming message to
generate one reply.
"""
raise RuntimeError("Override me!")
async def _do_reply(self, msg):
"""
Create and send the reply for one message.
"""
logger.debug("S> %s",msg)
seq = msg.get('id')
ex = None
try:
# We save this request's scope so
with trio.CancelScope() as cs:
self.jobs[seq] = cs
try:
msg = msg['msg']
except KeyError:
self.jobs[msg['kill']].cancel()
else:
msg = await self.process(msg)
except Exception as exc:
msg = {'err':repr(exc)}
except BaseException as exc:
msg = {'err':repr(exc)}
ex = exc
else:
msg = {'msg': msg}
finally:
if seq:
del self.jobs[seq]
if not seq:
return
msg['id'] = seq
logger.debug("S< %s",msg)
async with self.lock:
try:
await self.sock.send_all(msgpack.packb(msg))
except trio.BrokenResourceError:
pass # client died. Oh well
if ex is not None:
raise ex
async def serve(self, task_status=trio.TASK_STATUS_IGNORED):
# The actual server_handler starts here.
# We simply read messages and run ``do_reply`` with each.
task_status.started()
unpacker = msgpack.Unpacker(raw=False)
async with trio.open_nursery() as n:
while True:
try:
data = await self.sock.receive_some(4096)
except trio.BrokenResourceError:
# client went away, stop work
n.cancel_scope.cancel()
return
unpacker.feed(data)
for msg in unpacker:
# TODO: add capacity management
n.start_soon(self._do_reply,msg)
########### Client ###############
class Client:
def __init__(self):
self._seq = 0
self._reply = {}
self._send_w,self._send_r = trio.open_memory_channel(0)
@asynccontextmanager
async def connect(self):
"""
Async context manager that connects us to a server.
"""
async with trio.open_nursery() as n:
async with await trio.open_tcp_stream("127.0.0.1",PORT) as s:
self.s = s
await n.start(self._recv)
await n.start(self._send)
try:
yield self
finally:
del self.s
n.cancel_scope.cancel()
async def _send(self, task_status=trio.TASK_STATUS_IGNORED):
task_status.started()
async for msg in self._send_r:
await self.s.send_all(msgpack.packb(msg))
async def _recv(self, task_status=trio.TASK_STATUS_IGNORED):
"""
Packet receiver loop.
"""
task_status.started()
unpacker = msgpack.Unpacker(raw=False)
while hasattr(self,'s'):
data = await self.s.receive_some(4096)
unpacker.feed(data)
for msg in unpacker:
logger.debug("C> %s",msg)
try:
self._dispatch(msg)
except Exception:
logger.exception("C?")
return
def _dispatch(self, msg):
"""
This code takes a reply and forwards it to the correct client task.
"""
seq = msg['id']
evt = self._reply[seq]
try:
self._reply[seq] = Value(msg['msg'])
except KeyError:
self._reply[seq] = Error(RemoteError(msg['err']))
# TODO if the server signalled an error, wrap in 'Error()' instead
evt.set()
async def ask(self, msg):
"""
Our main entry point. Sends a request to the server and waits for a
reply. If cancelled, sends a Kill request to the server.
"""
self._seq += 1
seq = self._seq
# Wrap the message, add seq number
msg = {'id': seq, 'msg': msg}
self._reply[seq] = evt = trio.Event()
try:
logger.debug("C< %s",msg)
await self._send_w.send(msg)
await evt.wait()
except trio.Cancelled:
# Try to tell the server to kill the request
with trio.move_on_after(2, shield=True):
try:
await self.s.send_all(msgpack.packb({'id':0,'kill':seq}))
except (trio.BrokenResourceError, trio.ClosedResourceError, AttributeError):
pass # the connection is already dead
raise # Never swallow trio.Cancelled!
else:
return self._reply[seq].unwrap()
finally:
del self._reply[seq]
########### Sample code: client ###############
async def asker(c,x,y):
"""
This is a sample client task. It sends a couple of messages to the
server and checks that it gets the correct data back.
"""
for z in range(randint(2,5)):
res = await c.ask({'x':x,'y':y,'z':z,'data':'Hello'})
assert res['data'] == "Hello back"
assert res['x'] == x
assert res['y'] == y
assert res['z'] == z
await trio.sleep(random())
async def client(x):
"""
This is a simple client. It starts a bunch of independent worker tasks
that talk to the server.
"""
async with Client().connect() as c:
async with trio.open_nursery() as n:
n.start_soon(asker,c,x,1)
n.start_soon(asker,c,x,2)
n.start_soon(asker,c,x,3)
########### Sample code: server ###############
class MyHandler(ServerHandler):
async def process(self, msg):
"""
This is a simple server-side reply generator. It returns the message
unmodified, except that 'back' is appended to the 'data' element.
"""
msg['data'] += ' back';
await trio.sleep(random()/3)
return msg
########### Sample code: tying it all together ###############
async def main():
"""
Main code. Fairly simplistic.
"""
def server_handler(sock):
h = MyHandler(sock)
return h.serve()
async with trio.open_nursery() as n:
# Start a server (and wait for it to be established)
await n.start(partial(trio.serve_tcp, server_handler, 50123, host="127.0.0.1"))
# Start some clients, in parallel.
# This part could (obviously) run in another process, or on a different host.
async with trio.open_nursery() as nn:
nn.start_soon(client,1)
nn.start_soon(client,2)
nn.start_soon(client,3)
# All clients have finished: kill the server.
n.cancel_scope.cancel()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
trio.run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment