Skip to content

Instantly share code, notes, and snippets.

@decentral1se
Created May 17, 2020 14:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save decentral1se/f12d4f8df7051c80aa5d15be615fdaec to your computer and use it in GitHub Desktop.
Save decentral1se/f12d4f8df7051c80aa5d15be615fdaec to your computer and use it in GitHub Desktop.
locking.py
from string import ascii_letters
from typing import Dict
import attr
from trio import ClosedResourceError, Lock, open_nursery, run
from trio.abc import Stream
from trio.testing import memory_stream_pair
@attr.s(auto_attribs=True)
class Channel:
id: int
stream: Stream
@attr.s(auto_attribs=True)
class Protocol:
id: int
stream: Stream
lock: Lock
MESSAGE_LENGTH: int = 1
TIMEOUT: int = 10
channels: Dict[int, Channel] = attr.Factory(dict)
async def recv(self):
while True:
async with self.lock:
try:
resp = await self.stream.receive_some(self.MESSAGE_LENGTH)
print(f"Protocol {self.id} received {resp.decode()}")
if resp == b"Z":
return
except ClosedResourceError:
return
async def asend(self, message):
async with self.lock:
await self.stream.send_all(message)
async def left_side_protocol(stream):
async with open_nursery() as nursery:
protocol = Protocol(id=1, stream=stream, lock=Lock())
nursery.start_soon(protocol.recv)
for letter in ascii_letters:
await protocol.asend(letter.encode())
async def right_side_protocol(stream):
async with open_nursery() as nursery:
protocol = Protocol(id=2, stream=stream, lock=Lock())
nursery.start_soon(protocol.recv)
for letter in ascii_letters:
await protocol.asend(letter.encode())
async def main():
left_side_stream, right_side_stream = memory_stream_pair()
async with open_nursery() as nursery:
nursery.start_soon(left_side_protocol, left_side_stream)
nursery.start_soon(right_side_protocol, right_side_stream)
run(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment