Skip to content

Instantly share code, notes, and snippets.

@DrPyser
Created January 25, 2019 05:32
Show Gist options
  • Save DrPyser/443c6037faf5d1a1b5364aba2ca15adb to your computer and use it in GitHub Desktop.
Save DrPyser/443c6037faf5d1a1b5364aba2ca15adb to your computer and use it in GitHub Desktop.
Sync and async implementations of bidirectionnal channel for in-process asynchronous communication, inspired by Go's
"""
Channel interface and implementations
for in-memory bidirectional, asynchronous and cross-thread communication
"""
import asyncio
from typing import Generic, TypeVar, AsyncIterable, AsyncIterator, Iterable
import abc
import queue
import threading
T = TypeVar("T")
class AsyncChannel(Generic[T], AsyncIterable[T]):
@abc.abstractmethod
async def send(self, message: T) -> None: pass
@abc.abstractmethod
async def receive(self) -> T: pass
@abc.abstractmethod
def mirrored(self) -> "AsyncChannel[T]":
"""
Return a mirror of this channel,
such that the mirror receives what this channel sends,
and this channel receives what the mirror sends.
"""
pass
class Channel(Generic[T], Iterable[T]):
@abc.abstractmethod
def send(self, message: T) -> None: pass
@abc.abstractmethod
def receive(self) -> T: pass
@abc.abstractmethod
def mirrored(self) -> "Channel[T]":
"""
Return a mirror of this channel,
such that the mirror receives what this channel sends,
and this channel receives what the mirror sends.
"""
pass
class AsyncioChannel(AsyncChannel[T]):
"""
Asynchronous bidirectional Channel implementation
based on the asyncio.Queue interface.
"""
def __init__(self, inbox: asyncio.Queue, outbox: asyncio.Queue, options):
self.inbox = inbox
self.outbox = outbox
self.pending = len(self.inbox)
self.options = options
self.options.setdefault("autocommit", True)
self.options.setdefault("autoflush", False)
async def send(self, message):
"""
Send a message across the channel.
If self.options["flush"] is True, this also waits
for the message to be received and processed before returning.
"""
queue = self.outbox
await queue.put(message)
if self.options["autoflush"]:
await self.flush()
async def receive(self):
queue = self.inbox
msg = await queue.get()
self.pending += 1
if self.options["autocommit"]:
self.commit()
return msg
def commit(self):
"""
Acknowledge all messages received and yet unacknowledged.
If self.options["autocommit"] is True (the default),
this is called automatically as part of each self.receive() call,
so no received value ever goes unacknowledged.
Setting self.options["autocommit"] to False allows
each received item to be acknowledged explicitly,
e.g. only when fully processed.
This is only useful if an owner of a mirror channel(i.e. self.mirrored())
awaits self.flush(), or if its self.options["autoflush"] is set to True.
"""
for _ in range(self.pending):
self.inbox.task_done()
self.pending -= 1
assert self.pending == 0
async def flush(self):
"""
block until all outbound messages
have been processed and acknowledged.
If self.options["autoflush"] is True,
`self.send` always calls this automatically,
so that every message needs to be acknowledged
directly after being sent,
and consequently the outbox is always empty.
"""
await self.outbox.join()
async def __aiter__(self):
while not self.inbox.empty():
msg = await self.receive()
yield msg
def mirrored(self, options: dict=None):
mirror_options = dict(self.options)
if options is not None:
mirror_options.update(
options
)
return AsyncioChannel(
inbox=self.outbox,
outbox=self.inbox,
options=mirror_options
)
class ThreadsafeChannel(Channel[T]):
"""
Synchronous Bidirectional channel implementation using queue.Queue.
Consequently, it's threadsafe.
"""
def __init__(self, inbox: queue.Queue, outbox: queue.Queue, options):
self.inbox = inbox
self.outbox = outbox
self._lock = threading.Lock()
self.pending = len(self.inbox)
self._in_block = options.pop("in_block", True)
self._out_block = options.pop("out_block", True)
self._in_timeout = options.pop("in_timeout", None)
self._out_timeout = options.pop("out_timeout", None)
self._autocommit = options.pop("autocommit", True)
self._autoflush = options.pop("autoflush", False)
self._autocommit_backup = None
def send(self, message):
self.outbox.put(
message,
block=self._out_block,
timeout=self._out_timeout
)
if self._autoflush:
self.flush()
def receive(self):
msg = self.inbox.get(
block=self._in_block,
timeout=self._in_timeout
)
# TODO: make sure it's threadsafe
with self._lock:
self.pending += 1
if self._autocommit:
self.commit()
return msg
def __iter__(self):
while not self.inbox.empty():
msg = self.receive()
yield msg
def commit(self):
"""
Acknowledge all messages received and yet unacknowledged.
If self.options["autocommit"] is True (the default),
this is called automatically as part of each self.receive() call,
so no received value ever goes unacknowledged.
Setting self.options["autocommit"] to False allows
each received item to be acknowledged explicitly,
e.g. only when fully processed.
This is only useful if an owner of a mirror channel(i.e. self.mirrored())
awaits self.flush(), or if its self.options["autoflush"] is set to True.
"""
# TODO: make sure it's threadsafe
with self._lock:
for _ in range(self.pending):
self.inbox.task_done()
self.pending -= 1
assert self.pending == 0
def flush(self):
"""
block until all outbound messages
have been processed and acknowledged.
If self.options["autoflush"] is True,
`self.send` always calls this automatically,
so that every message needs to be acknowledged
directly after being sent,
and consequently the outbox is always empty.
"""
self.outbox.join()
def mirrored(self, options: dict=None):
mirror_options = dict(
autoflush=self._autoflush,
autocommit=self._autocommit,
in_block=self._in_block,
out_block=self._out_block,
in_timeout=self._in_timeout,
out_timeout=self._out_timeout
)
if options is not None:
mirror_options.update(
options
)
return ThreadsafeChannel(
inbox=self.outbox,
outbox=self.inbox,
options=mirror_options
)
def __enter__(self):
# TODO: make threadsafe
with self._lock:
self._autocommit_backup = self._autocommit
self._autocommit = False
return self
def __exit__(self, exc_type, exc_value, exc_tb):
with self._lock:
self._autocommit = self._autocommit_backup
if exc_value is not None:
self.commit()
def sync_channel(options: dict=None) -> Channel[T]:
"""
create a new synchronous threadsafe channel
that can be used for communication across threads.
The interface is synchronous and blocking.
:param options: a dictionary of options to configure the channel.
Supported options:
- `in_maxsize`: maxsize parameter of the inbox queue
- `out_maxsize`: maxsize parameter of the outbox queue
"""
if options is None:
options = dict()
options.setdefault("in_maxsize", 1)
options.setdefault("out_maxsize", 1)
inbox, outbox = queue.Queue(options["in_maxsize"]), queue.Queue(options["out_maxsize"])
return ThreadsafeChannel(inbox, outbox, options=options)
def async_channel(options: dict=None) -> AsyncChannel[T]:
"""
create a new asynchronous channel
that can be used for communication between tasks/coroutines
running in the same thread.
The implementation is not threadsafe.
:param options: a dictionary of options to configure the channel.
Supported options:
- `in_maxsize`: maxsize parameter of the inbox queue
- `out_maxsize`: maxsize parameter of the outbox queue
"""
if options is None:
options = dict()
options.setdefault("in_maxsize", 1)
options.setdefault("out_maxsize", 1)
inbox, outbox = asyncio.Queue(options["in_maxsize"]), asyncio.Queue(options["out_maxsize"])
return AsyncChannel(inbox, outbox, options=options)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment