Skip to content

Instantly share code, notes, and snippets.

@njsmith
Last active May 13, 2023 18:59
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save njsmith/3494ebd27f32c2ac168c71de116ccc5d to your computer and use it in GitHub Desktop.
Save njsmith/3494ebd27f32c2ac168c71de116ccc5d to your computer and use it in GitHub Desktop.
# Rough draft of a Queue object that can be used simultaneously simultaneously from
# sync threads + *multiple* trio threads, all at once.
#
# If you don't have multiple threads each doing their own separate calls to trio.run,
# then don't use this; there are simpler solutions. This was mostly an exercise to
# figure out if and how this could be done.
#
# Also note: completely untested, probably has bugs
from collections import OrderedDict
from functools import partial
import outcome
import threading
import trio
class CrossThreadUnbufferedFIFOQueue:
def __init__(self):
self._lock = attr.ib(factory=threading.Lock)
# Used as FIFO queues; value is always None
self._putters = OrderedDict()
self._getters = OrderedDict()
def sync_put(self, value):
with self._lock:
if self._getters:
getter, _ = self._getters.popitem(last=False)
getter(value)
return
# Blocking path
waker = threading.Lock()
waker.acquire()
def putter():
waker.release()
return value
self._putters[putter] = None
waker.acquire()
def sync_get(self):
with self._lock:
if self._putters:
putter, _ = self._putters.popitem(last=False)
return putter()
# Blocking path
waker = threading.Lock()
waker.acquire()
value_shared = None
def getter(value):
nonlocal value_shared
value_shared = value
waker.release()
self._getters[getter] = None
waker.acquire()
return value_shared
async def async_put(self, value):
with self._lock:
if self._getters:
getter, _ = self._getters.popitem(last=False)
getter(value)
return
# Blocking path
token = trio.lowlevel.current_token()
task = trio.lowlevel.current_task()
def putter():
if trio.lowlevel.current_token() is token:
trio.lowlevel.reschedule(task)
return value
else:
trio.from_thread.run_sync(partial(
trio.lowlevel.reschedule, task, trio_token=token
))
return value
self._putters[putter] = None
def abort_fn(_):
with self._lock:
if putter in self._putters:
del self._putters
return trio.lowlevel.Abort.SUCCEEDED
else:
return trio.lowlevel.Abort.FAILED
await trio.wait_task_rescheduled(abort_fn)
async def async_get(self):
with self._lock:
if self._putters:
putter, _ = self._putters.popitem(last=False)
return putter()
# Blocking path
token = trio.lowlevel.current_token()
task = trio.lowlevel.current_task()
def getter(value):
if trio.lowlevel.current_token() is token:
trio.lowlevel.reschedule(task, outcome.Value(value))
else:
trio.from_thread.run_sync(partial(
trio.lowlevel.reschedule,
task,
outcome.Value(value),
trio_token=token
))
self._getters[getter] = None
def abort_fn(_):
with self._lock:
if getter in self._getters:
del self._getters
return trio.lowlevel.Abort.SUCCEEDED
else:
return trio.lowlevel.Abort.FAILED
return await trio.wait_task_rescheduled(abort_fn)
@njsmith
Copy link
Author

njsmith commented Jan 24, 2021

Oh haha whoops, those first two classes are vestigial leftovers from an earlier draft, theyr'e not actually used at all :-) I'll delete.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment