Skip to content

Instantly share code, notes, and snippets.

@decentral1se
Last active February 4, 2020 14:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save decentral1se/ee7c7607df3cbcde8a03073dbbc5cc91 to your computer and use it in GitHub Desktop.
Save decentral1se/ee7c7607df3cbcde8a03073dbbc5cc91 to your computer and use it in GitHub Desktop.
trio_exc.py
import pytest
import trio
from trio.testing import memory_stream_pair
from muxdemux import open_protocol
@pytest.fixture()
def stream():
return memory_stream_pair()
@pytest.fixture()
def lstream(stream):
return stream[0]
@pytest.fixture()
def rstream(stream):
yield stream[1]
@pytest.fixture()
async def lproto(lstream):
async with trio.open_nursery() as n:
async with open_protocol(1, lstream, n) as p:
yield p
@pytest.fixture()
async def rproto(rstream):
async with trio.open_nursery() as n:
async with open_protocol(2, rstream, n) as p:
yield p
async def test_proto_async_usage(lstream, rstream):
async with trio.open_nursery() as n:
async with open_protocol(1, lstream, n):
pass
async def test_wire_up_two_protos(lproto, rproto):
pass
async def test_push_one_way(lproto, rproto):
await lproto.push(b"a")
assert await rproto.stream.receive_some(1) == b"a"
async def test_push_two_ways(lproto, rproto):
await lproto.push(b"a")
assert await rproto.stream.receive_some(1) == b"a"
await rproto.push(b"b")
assert await lproto.stream.receive_some(1) == b"b"
async def test_handles_lproto_broken_resource(lproto, rproto):
async def lproto_send_task():
while True:
await lproto.push(b"a")
async def rproto_recv_task():
while True:
if not await rproto.pull(1):
return
async with trio.open_nursery() as nursery:
nursery.start_soon(lproto_send_task)
nursery.start_soon(rproto_recv_task)
await rproto.stream.aclose()
async def test_handles_lproto_closed_resource(lproto, rproto):
async def lproto_send_task():
while True:
await lproto.push(b"a")
async def rproto_recv_task():
while True:
if not await rproto.pull(1):
return
async with trio.open_nursery() as nursery:
nursery.start_soon(lproto_send_task)
nursery.start_soon(rproto_recv_task)
await lproto.stream.aclose()
from contextlib import asynccontextmanager
import attr
from trio import BrokenResourceError, ClosedResourceError, Nursery
from trio.abc import Stream
@attr.s(auto_attribs=True)
class Protocol:
id: int
stream: Stream
nursery: Nursery
async def push(self, message):
try:
await self.stream.send_all(message)
except (BrokenResourceError, ClosedResourceError) as exc:
print(f"push: {type(exc)}: {exc}")
self.nursery.cancel_scope.cancel()
async def pull(self, length):
try:
return await self.stream.receive_some(length)
except (BrokenResourceError, ClosedResourceError) as exc:
print(f"pull: {type(exc)}: {exc}")
self.nursery.cancel_scope.cancel()
@asynccontextmanager
async def open_protocol(id, stream, nursery):
try:
protocol = Protocol(id, stream, nursery)
yield protocol
finally:
await protocol.stream.aclose()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment