Last active
July 31, 2019 22:50
-
-
Save zthompson47/caa0d73427335ec694110e3795c79ef0 to your computer and use it in GitHub Desktop.
A simple REPL for Trio, with top-level `await` and built-in `nursery`
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ast | |
import sys | |
import traceback | |
from functools import partial | |
import trio | |
from pygments.styles import get_style_by_name | |
from pygments.lexers.python import PythonLexer | |
from prompt_toolkit.lexers import PygmentsLexer | |
from prompt_toolkit.patch_stdout import patch_stdout | |
from prompt_toolkit.styles.pygments import style_from_pygments_cls | |
from prompt_toolkit import prompt, HTML, PromptSession | |
PS1 = HTML("<yellow>>>></yellow> ") | |
PS2 = HTML("<yellow>...</yellow> ") | |
STYLE = style_from_pygments_cls(get_style_by_name("monokai")) | |
_wrap_eval = """async def __wrap(nursery): | |
result = {} | |
_locals = locals() | |
del _locals["nursery"] | |
return (_locals, result) | |
""" | |
_wrap_exec = """async def __wrap(nursery): | |
{} | |
_locals = locals() | |
del _locals["nursery"] | |
return (_locals, None) | |
""" | |
class MainWrap(object): | |
async def main(self): | |
session = PromptSession() | |
import trio | |
async with trio.open_nursery() as nursery: | |
try: | |
while True: | |
with patch_stdout(): | |
code = await trio.run_sync_in_worker_thread( | |
partial( | |
session.prompt, PS1, style=STYLE, | |
lexer=PygmentsLexer(PythonLexer), | |
include_default_pygments_style=False, | |
vi_mode=True | |
) | |
) | |
try: | |
ast_code = ast.parse(code) | |
except SyntaxError as err: | |
if "unexpected EOF" in str(err): | |
while True: | |
nextline = await trio.run_sync_in_worker_thread( | |
partial( | |
session.prompt, PS2, style=STYLE, | |
lexer=PygmentsLexer(PythonLexer), | |
include_default_pygments_style=False, | |
vi_mode=True | |
) | |
) | |
if not nextline: | |
break | |
code += "\n " + nextline | |
try: | |
ast_code = ast.parse(code) | |
except SyntaxError: | |
traceback.print_exc() | |
continue | |
else: | |
traceback.print_exc() | |
continue | |
if not ast_code.body: | |
continue | |
if isinstance(ast_code.body[0], ast.Expr): | |
exec(_wrap_eval.format(code)) | |
else: | |
exec(_wrap_exec.format(code)) | |
__wrap = locals()['__wrap'] | |
try: | |
inner_locals, result = await __wrap(nursery) | |
if result is not None: | |
print(result) | |
except: | |
traceback.print_exc() | |
else: | |
for var, val in inner_locals.items(): | |
sys.modules[__name__].__dict__[var] = val | |
except (KeyboardInterrupt, EOFError): | |
nursery.cancel_scope.cancel() | |
trio.run(MainWrap().main) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import ast | |
import sys | |
import traceback | |
from functools import partial | |
import asyncio | |
import trio | |
from pygments.styles import get_style_by_name | |
from pygments.lexers.python import PythonLexer | |
from prompt_toolkit.lexers import PygmentsLexer | |
from prompt_toolkit.patch_stdout import patch_stdout | |
from prompt_toolkit.styles.pygments import style_from_pygments_cls | |
from prompt_toolkit import prompt, HTML, PromptSession | |
from prompt_toolkit.key_binding import KeyBindings | |
from prompt_toolkit.application.current import get_app_session | |
PS1 = HTML("<yellow>>>></yellow> ") | |
PS2 = HTML("<yellow>...</yellow> ") | |
STYLE = style_from_pygments_cls(get_style_by_name("monokai")) | |
TAB = " " * 4 | |
_bindings = KeyBindings() | |
@_bindings.add("c-i") | |
def _bind(event): | |
event.current_buffer.insert_text(TAB) | |
_wrap_eval = """async def __wrap(nursery): | |
result = {} | |
_locals = locals() | |
del _locals["nursery"] | |
return (_locals, result) | |
""" | |
_wrap_exec = """async def __wrap(nursery): | |
{} | |
_locals = locals() | |
del _locals["nursery"] | |
return (_locals, None) | |
""" | |
async def ptk_asyncio_task(session, ps, indent=""): | |
with patch_stdout(): | |
return await session.prompt_async( | |
ps, style=STYLE, | |
lexer=PygmentsLexer(PythonLexer), | |
include_default_pygments_style=False, | |
vi_mode=True, default=indent, | |
key_bindings=_bindings, | |
) | |
class MainWrap(object): | |
async def main(self): | |
session = PromptSession() | |
import trio | |
async with trio.open_nursery() as nursery: | |
while True: | |
try: | |
code = await trio.run_sync_in_worker_thread( | |
partial(asyncio.run, ptk_asyncio_task(session, PS1)) | |
) | |
try: | |
ast_code = ast.parse(code) | |
except SyntaxError as err: | |
if "unexpected EOF" in str(err): | |
indent_level = 1 | |
blank = re.compile("^\s+$") | |
while True: | |
indent = TAB * indent_level | |
nextline = await trio.run_sync_in_worker_thread( | |
partial( | |
asyncio.run, | |
ptk_asyncio_task(session, PS2, indent) | |
) | |
) | |
if not nextline or blank.match(nextline): | |
break | |
if nextline[-1] == ":": | |
indent_level += 1 | |
code += "\n " + nextline | |
try: | |
ast_code = ast.parse(code) | |
except SyntaxError: | |
traceback.print_exc() | |
continue | |
else: | |
traceback.print_exc() | |
continue | |
if not ast_code.body: | |
continue | |
if isinstance(ast_code.body[0], ast.Expr): | |
exec(_wrap_eval.format(code)) | |
else: | |
exec(_wrap_exec.format(code)) | |
__wrap = locals()['__wrap'] | |
try: | |
inner_locals, result = await __wrap(nursery) | |
if result is not None: | |
print(result) | |
except: | |
traceback.print_exc() | |
else: | |
for var, val in inner_locals.items(): | |
sys.modules[__name__].__dict__[var] = val | |
except KeyboardInterrupt: | |
continue | |
except EOFError: | |
nursery.cancel_scope.cancel() | |
trio.run(MainWrap().main) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment