Skip to content

Instantly share code, notes, and snippets.

@mrzechonek
Created February 11, 2019 06:36
Show Gist options
  • Save mrzechonek/6e2055df6fa1b6995f8dd31784b67a77 to your computer and use it in GitHub Desktop.
Save mrzechonek/6e2055df6fa1b6995f8dd31784b67a77 to your computer and use it in GitHub Desktop.
#!python3
import signal
from contextlib import suppress
from concurrent.futures import ThreadPoolExecutor
from gi.repository import GLib
from prompt_toolkit import prompt
from prompt_toolkit.eventloop.defaults import get_event_loop, set_event_loop
from prompt_toolkit.patch_stdout import patch_stdout
from prompt_toolkit.eventloop.base import EventLoop
from prompt_toolkit.eventloop.context import wrap_in_current_context
from prompt_toolkit.eventloop.utils import ThreadWithFuture
class GLibEventLoop(EventLoop):
"""
Wrapper around the GLib event loop, but compatible with prompt_toolkit.
"""
def __init__(self, loop=None):
super(GLibEventLoop, self).__init__()
self.loop = loop or GLib.MainLoop()
self.executor = ThreadPoolExecutor()
self.descriptors = {}
self.signals = {}
self.closed = False
def close(self):
# Note: we should not close the asyncio loop itself, because that one
# was not created here.
self.closed = True
def run_until_complete(self, future, inputhook=None):
if inputhook:
raise ValueError("GLibEventLoop doesn't support input hooks.")
future.add_done_callback(lambda *args: self.loop.quit())
with self.executor:
self.loop.run()
def run_forever(self, inputhook=None):
if inputhook:
raise ValueError("GLibEventLoop doesn't support input hooks.")
with self.executor:
self.loop.run()
def run_in_executor(self, callback, _daemon=False):
if _daemon:
# Asyncio doesn't support 'daemon' executors.
th = ThreadWithFuture(callback, daemon=True)
self.call_from_executor(th.start)
return th.future
else:
callback = wrap_in_current_context(callback)
f = self.executor.submit(callback)
return f
def call_from_executor(self, callback, _max_postpone_until=None):
"""
Call this function in the main event loop.
Similar to Twisted's ``callFromThread``.
"""
callback = wrap_in_current_context(callback)
GLib.idle_add(callback)
def add_reader(self, fd, callback):
" Start watching the file descriptor for read availability. "
callback = wrap_in_current_context(callback)
def io_watch(reader, flags):
callback()
return True
self.descriptors[fd] = GLib.io_add_watch(fd,
GLib.PRIORITY_DEFAULT,
GLib.IO_IN,
io_watch)
def remove_reader(self, fd):
" Stop watching the file descriptor for read availability. "
with suppress(KeyError):
GLib.Source.remove(self.descriptors.pop(fd))
def add_signal_handler(self, signum, handler):
if handler:
self.signals[signum] = GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, signum, handler)
else:
with suppress(KeyError):
GLib.Source.remove(self.signals.pop(signum))
def use_glib_event_loop(loop=None):
"""
Use the asyncio event loop for prompt_toolkit applications.
"""
# Don't create a new loop if the current one uses asyncio already.
current_loop = get_event_loop()
if current_loop and isinstance(current_loop, GLibEventLoop):
return
set_event_loop(GLibEventLoop(loop))
use_glib_event_loop()
def on_command(result):
if result:
print('You said: %s' % result)
p = prompt('Say something: ', async_=True)
def done(f):
try:
on_command(f.result())
except KeyboardInterrupt as ex:
print('Exception', type(ex), ex)
GLib.MainLoop().quit()
p.add_done_callback(done)
if __name__ == '__main__':
loop = GLib.MainLoop()
GLib.idle_add(lambda: on_command(None))
with patch_stdout():
loop.run()
# print('You said: %s' % result)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment