Created
January 25, 2019 05:32
-
-
Save DrPyser/443c6037faf5d1a1b5364aba2ca15adb to your computer and use it in GitHub Desktop.
Sync and async implementations of bidirectionnal channel for in-process asynchronous communication, inspired by Go's
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Channel interface and implementations | |
for in-memory bidirectional, asynchronous and cross-thread communication | |
""" | |
import asyncio | |
from typing import Generic, TypeVar, AsyncIterable, AsyncIterator, Iterable | |
import abc | |
import queue | |
import threading | |
T = TypeVar("T") | |
class AsyncChannel(Generic[T], AsyncIterable[T]): | |
@abc.abstractmethod | |
async def send(self, message: T) -> None: pass | |
@abc.abstractmethod | |
async def receive(self) -> T: pass | |
@abc.abstractmethod | |
def mirrored(self) -> "AsyncChannel[T]": | |
""" | |
Return a mirror of this channel, | |
such that the mirror receives what this channel sends, | |
and this channel receives what the mirror sends. | |
""" | |
pass | |
class Channel(Generic[T], Iterable[T]): | |
@abc.abstractmethod | |
def send(self, message: T) -> None: pass | |
@abc.abstractmethod | |
def receive(self) -> T: pass | |
@abc.abstractmethod | |
def mirrored(self) -> "Channel[T]": | |
""" | |
Return a mirror of this channel, | |
such that the mirror receives what this channel sends, | |
and this channel receives what the mirror sends. | |
""" | |
pass | |
class AsyncioChannel(AsyncChannel[T]): | |
""" | |
Asynchronous bidirectional Channel implementation | |
based on the asyncio.Queue interface. | |
""" | |
def __init__(self, inbox: asyncio.Queue, outbox: asyncio.Queue, options): | |
self.inbox = inbox | |
self.outbox = outbox | |
self.pending = len(self.inbox) | |
self.options = options | |
self.options.setdefault("autocommit", True) | |
self.options.setdefault("autoflush", False) | |
async def send(self, message): | |
""" | |
Send a message across the channel. | |
If self.options["flush"] is True, this also waits | |
for the message to be received and processed before returning. | |
""" | |
queue = self.outbox | |
await queue.put(message) | |
if self.options["autoflush"]: | |
await self.flush() | |
async def receive(self): | |
queue = self.inbox | |
msg = await queue.get() | |
self.pending += 1 | |
if self.options["autocommit"]: | |
self.commit() | |
return msg | |
def commit(self): | |
""" | |
Acknowledge all messages received and yet unacknowledged. | |
If self.options["autocommit"] is True (the default), | |
this is called automatically as part of each self.receive() call, | |
so no received value ever goes unacknowledged. | |
Setting self.options["autocommit"] to False allows | |
each received item to be acknowledged explicitly, | |
e.g. only when fully processed. | |
This is only useful if an owner of a mirror channel(i.e. self.mirrored()) | |
awaits self.flush(), or if its self.options["autoflush"] is set to True. | |
""" | |
for _ in range(self.pending): | |
self.inbox.task_done() | |
self.pending -= 1 | |
assert self.pending == 0 | |
async def flush(self): | |
""" | |
block until all outbound messages | |
have been processed and acknowledged. | |
If self.options["autoflush"] is True, | |
`self.send` always calls this automatically, | |
so that every message needs to be acknowledged | |
directly after being sent, | |
and consequently the outbox is always empty. | |
""" | |
await self.outbox.join() | |
async def __aiter__(self): | |
while not self.inbox.empty(): | |
msg = await self.receive() | |
yield msg | |
def mirrored(self, options: dict=None): | |
mirror_options = dict(self.options) | |
if options is not None: | |
mirror_options.update( | |
options | |
) | |
return AsyncioChannel( | |
inbox=self.outbox, | |
outbox=self.inbox, | |
options=mirror_options | |
) | |
class ThreadsafeChannel(Channel[T]): | |
""" | |
Synchronous Bidirectional channel implementation using queue.Queue. | |
Consequently, it's threadsafe. | |
""" | |
def __init__(self, inbox: queue.Queue, outbox: queue.Queue, options): | |
self.inbox = inbox | |
self.outbox = outbox | |
self._lock = threading.Lock() | |
self.pending = len(self.inbox) | |
self._in_block = options.pop("in_block", True) | |
self._out_block = options.pop("out_block", True) | |
self._in_timeout = options.pop("in_timeout", None) | |
self._out_timeout = options.pop("out_timeout", None) | |
self._autocommit = options.pop("autocommit", True) | |
self._autoflush = options.pop("autoflush", False) | |
self._autocommit_backup = None | |
def send(self, message): | |
self.outbox.put( | |
message, | |
block=self._out_block, | |
timeout=self._out_timeout | |
) | |
if self._autoflush: | |
self.flush() | |
def receive(self): | |
msg = self.inbox.get( | |
block=self._in_block, | |
timeout=self._in_timeout | |
) | |
# TODO: make sure it's threadsafe | |
with self._lock: | |
self.pending += 1 | |
if self._autocommit: | |
self.commit() | |
return msg | |
def __iter__(self): | |
while not self.inbox.empty(): | |
msg = self.receive() | |
yield msg | |
def commit(self): | |
""" | |
Acknowledge all messages received and yet unacknowledged. | |
If self.options["autocommit"] is True (the default), | |
this is called automatically as part of each self.receive() call, | |
so no received value ever goes unacknowledged. | |
Setting self.options["autocommit"] to False allows | |
each received item to be acknowledged explicitly, | |
e.g. only when fully processed. | |
This is only useful if an owner of a mirror channel(i.e. self.mirrored()) | |
awaits self.flush(), or if its self.options["autoflush"] is set to True. | |
""" | |
# TODO: make sure it's threadsafe | |
with self._lock: | |
for _ in range(self.pending): | |
self.inbox.task_done() | |
self.pending -= 1 | |
assert self.pending == 0 | |
def flush(self): | |
""" | |
block until all outbound messages | |
have been processed and acknowledged. | |
If self.options["autoflush"] is True, | |
`self.send` always calls this automatically, | |
so that every message needs to be acknowledged | |
directly after being sent, | |
and consequently the outbox is always empty. | |
""" | |
self.outbox.join() | |
def mirrored(self, options: dict=None): | |
mirror_options = dict( | |
autoflush=self._autoflush, | |
autocommit=self._autocommit, | |
in_block=self._in_block, | |
out_block=self._out_block, | |
in_timeout=self._in_timeout, | |
out_timeout=self._out_timeout | |
) | |
if options is not None: | |
mirror_options.update( | |
options | |
) | |
return ThreadsafeChannel( | |
inbox=self.outbox, | |
outbox=self.inbox, | |
options=mirror_options | |
) | |
def __enter__(self): | |
# TODO: make threadsafe | |
with self._lock: | |
self._autocommit_backup = self._autocommit | |
self._autocommit = False | |
return self | |
def __exit__(self, exc_type, exc_value, exc_tb): | |
with self._lock: | |
self._autocommit = self._autocommit_backup | |
if exc_value is not None: | |
self.commit() | |
def sync_channel(options: dict=None) -> Channel[T]: | |
""" | |
create a new synchronous threadsafe channel | |
that can be used for communication across threads. | |
The interface is synchronous and blocking. | |
:param options: a dictionary of options to configure the channel. | |
Supported options: | |
- `in_maxsize`: maxsize parameter of the inbox queue | |
- `out_maxsize`: maxsize parameter of the outbox queue | |
""" | |
if options is None: | |
options = dict() | |
options.setdefault("in_maxsize", 1) | |
options.setdefault("out_maxsize", 1) | |
inbox, outbox = queue.Queue(options["in_maxsize"]), queue.Queue(options["out_maxsize"]) | |
return ThreadsafeChannel(inbox, outbox, options=options) | |
def async_channel(options: dict=None) -> AsyncChannel[T]: | |
""" | |
create a new asynchronous channel | |
that can be used for communication between tasks/coroutines | |
running in the same thread. | |
The implementation is not threadsafe. | |
:param options: a dictionary of options to configure the channel. | |
Supported options: | |
- `in_maxsize`: maxsize parameter of the inbox queue | |
- `out_maxsize`: maxsize parameter of the outbox queue | |
""" | |
if options is None: | |
options = dict() | |
options.setdefault("in_maxsize", 1) | |
options.setdefault("out_maxsize", 1) | |
inbox, outbox = asyncio.Queue(options["in_maxsize"]), asyncio.Queue(options["out_maxsize"]) | |
return AsyncChannel(inbox, outbox, options=options) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment