Last active
July 9, 2020 14:56
-
-
Save remdragon/6dd5c70455ff6f499336f4f192dec36b to your computer and use it in GitHub Desktop.
trio_win32
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import trio_win32 | |
async def main() -> int: | |
# TODO: start other tasks here... | |
await trio_win32.event_loop() | |
return 0 | |
sys.exit ( trio_win32.run ( main ) ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# NOTE: this is a rough draft mock up, there are likely typos, etc... | |
from collections import deque | |
import ctypes | |
PM_REMOVE = 1 | |
QS_POSTMESSAGE = 0x8 | |
WAIT_OBJECT_0 = 0 | |
WAIT_ABANDONED = 0x80 | |
class _EventLoop: | |
handle: int | |
queue: deque[Coroutine[Any,Any,Any]] | |
running: bool = False | |
_loop: Opt[_EventLoop] = None | |
def trio_win32_run ( amain: Coroutine[Any,Any,Any] ) -> Any: | |
global _loop | |
assert _loop is None | |
_loop = _EventLoop() | |
_loop.event = kernel32.CreateEventEx ( None, None, 0, 0 ) | |
assert _loop.event # msdn says NULL -> failure | |
_loop.queue = deque() | |
_loop.running = True | |
trio.lowlevel.start_guest_run ( | |
amain, | |
run_sync_soon_threadsafe = _run_sync_soon_threadsafe, | |
done_callback = _done_callback, | |
) | |
def _run_sync_soon_threadsafe ( func: Coroutine[Any,Any,Any] ) -> None: | |
# IMPORTANT: add func to queue *before* triggering event | |
assert _loop is not None | |
_loop.queue.append ( func ) | |
kernel32.SetEvent ( _loop.handle ) | |
def _done_callback ( trio_main_outcome ): | |
log = logger.getChild ( '_done_callback' ) | |
global _loop | |
log.info ( f'Trio ended with {trio_main_outcome!r}' ) | |
_loop.running = False | |
kern32l.SetEvent ( _loop.handle ) | |
async def event_loop ( *, | |
dwMilliseconds: int = 0, # 0 = forever | |
is_dialog: bool = True, | |
quit: Opt[Callable[[],bool]] = lambda: False, | |
) -> int: | |
global _loop | |
while _loop.running and not quit(): | |
# this function will process events until WM_QUIT is posted | |
# however, the optional quit lambda gives you a parachute | |
# that can be triggered from a popup child window event loop | |
# like a modal dialog ( enable it from WM_CLOSE ) | |
pHandles = ( ctypes.c_void_p * 1 ) ( _loop.event ) | |
nCount = 1 | |
dwWakeMask = QS_POSTMESSAGE | |
dwFlags = 0 # TODO FIXME: may need to play with these | |
trigger = user32.MsgWaitForMultipleObjectsEx ( | |
ctypes.byref ( pHandles ), | |
nCount, | |
dwMilliseconds, | |
dwWakeMask, | |
dwFlags, | |
) | |
if trigger == WAIT_OBJECT_0: # event handle triggered | |
global _loop | |
assert _loop is not None | |
kernel32.ResetEvent ( _loop.handle ) | |
while not quit(): # process all pending trio callbacks | |
try: | |
func = _loop.queue.popleft() | |
except IndexError: | |
break # queue is empty | |
else: | |
func() | |
elif trigger == WAIT_OBJECT_0 + count: # GetMessage() is hungry | |
msg = MSG() | |
# process all pending win32 messages: | |
while user32.PeekMessage ( msg, None, 0, 0, PM_REMOVE ): | |
if msg.message == WM_QUIT or quit(): | |
return | |
if is_dialog and user32.IsDialogMessage ( msg ): | |
continue | |
user32.TranslateMessage ( msg ) | |
user32.DispatchMessage ( msg ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment