Created
November 10, 2018 15:18
-
-
Save pmp-p/35ef4d32a11aae18208a8f30c5a26d20 to your computer and use it in GitHub Desktop.
minimalist async repl ( with a clock ! )
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# === utilities =================== | |
import builtins | |
builtins.__print__ = print | |
def pdb(*a,**k): | |
k.setdefault( 'file', sys.__stderr__ ) | |
return __print__(*a,**k) | |
async def coroutine():pass | |
coroutine = coroutine() | |
#==== display coroutines values ================ | |
#1: coroutine() | |
#2: x=coroutine();x | |
#3: x=await coroutine() | |
#1/2/3 | |
import sys | |
import asyncio | |
import builtins | |
#3 | |
import readline | |
import ast | |
import textwrap | |
stdwrite = sys.__stdout__.write | |
async def await_displayhook(value): | |
try: | |
stdwrite( f':async: {await value}\n~~> ') | |
except RuntimeError as e: | |
stdwrite(f'{e.__class__.__name__}: {str(e)}\n~~> ') | |
except Exception as e: | |
pdb(':',e,e.__name__) | |
finally: | |
sys.ps1 = builtins.__ps1__ | |
# 1 & 2 | |
def displayhook(value): | |
if value is None: | |
return | |
builtins._ = None | |
text = repr(value) | |
if isinstance(value,coroutine.__class__): | |
builtins.__ps1__ = sys.ps1 | |
sys.ps1 = "" | |
import asyncio | |
stdwrite(f":async: awaiting {text}") | |
asyncio.get_event_loop().create_task( await_displayhook(value) ) | |
else: | |
try: | |
stdwrite(text) | |
except UnicodeEncodeError: | |
bytes = text.encode(sys.stdout.encoding, 'backslashreplace') | |
if hasattr(sys.stdout, 'buffer'): | |
sys.stdout.buffer.write(bytes) | |
else: | |
text = bytes.decode(sys.stdout.encoding, 'strict') | |
stdwrite(text) | |
stdwrite("\n") | |
builtins._ = value | |
sys.displayhook = displayhook | |
# 3 | |
async def retry(index): | |
try: | |
for x in range(10): | |
code = readline.get_history_item(index) | |
if code and code.count('await'): | |
stdwrite(f'{code}"\n') | |
code = textwrap.indent(code,' '*4 ) | |
code = f""" | |
#========================================== | |
import builtins | |
async def retry_async_body(): | |
__snapshot = list( locals().keys() ) | |
{code} | |
maybe_new = list( locals().keys() ) | |
while len(__snapshot): | |
try:maybe_new.remove( __snapshot.pop() ) | |
except:pass | |
maybe_new.remove('__snapshot') | |
print('_'*30) | |
while len(maybe_new): | |
new_one = maybe_new.pop(0) | |
print(new_one , ':=', locals()[new_one]) | |
globals()[new_one] = locals()[new_one] | |
#========================================== | |
""" | |
#stdwrite(f'\n{code}~~> ') | |
bytecode = compile(code, '<asyncify>', 'exec') | |
exec(bytecode, globals(), globals()) | |
await retry_async_body() | |
return stdwrite('~~> ') | |
await asyncio.sleep(.001) | |
stdwrite(f':async: code vanished\n~~> ') | |
finally: | |
sys.ps1 = builtins.__ps1__ | |
def excepthook(etype, e, tb): | |
if isinstance(e, SyntaxError) and e.filename == '<stdin>': | |
index = readline.get_current_history_length() | |
builtins.__ps1__ = sys.ps1 | |
sys.ps1 = "" | |
stdwrite(f':async: asyncify "') | |
asyncio.get_event_loop().create_task( retry(index) ) | |
#discard trace | |
return | |
sys.__excepthook__(etype, e, tb) | |
sys.excepthook = excepthook | |
#======== have asyncio loop runs with interleaved with repl | |
import sys | |
import builtins | |
if not sys.flags.inspect: | |
print("Error: interpreter must be run with -i or PYTHONINSPECT must be set") | |
raise SystemExit | |
def schedule(fn, a): | |
if getattr(builtins, "scheduled", None) is None: | |
#! KEEP IT WOULD BE GC OTHERWISE! | |
# builtins.wrapper_ref | |
builtins.scheduled = [] | |
from ctypes import pythonapi, cast, c_char_p, c_void_p, CFUNCTYPE | |
HOOKFUNC = CFUNCTYPE(c_char_p, c_void_p, c_void_p, c_char_p) | |
PyOS_InputHookFunctionPointer = c_void_p.in_dll(pythonapi, "PyOS_InputHook") | |
def scheduler(*a, **k): | |
# prevent reenter | |
lq = len(builtins.scheduled) | |
while lq: | |
fn, a = builtins.scheduled.pop(0) | |
fn(a) | |
lq -= 1 | |
builtins.scheduler = scheduler | |
builtins.wrapper_ref = HOOKFUNC(builtins.scheduler) | |
PyOS_InputHookFunctionPointer.value = cast(builtins.wrapper_ref, c_void_p).value | |
def schedule(fn, a): | |
builtins.scheduled.append((fn, a)) | |
__import__(__name__).schedule = schedule | |
del schedule | |
__import__(__name__).schedule(fn,a) | |
# ========== asyncio stepping ================ | |
def aio_step(arg): | |
global aio | |
if aio.is_closed(): | |
sys.__stdout__.write(f'\n:async: stopped\n{sys.ps1}') | |
return | |
aio.call_soon(aio.stop) | |
aio.run_forever() | |
if arg: | |
schedule( aio_step, arg) | |
def run(*entrypoints): | |
global aio | |
aio = asyncio.get_event_loop() | |
for entrypoint in entrypoints: | |
aio.create_task( entrypoint() ) | |
schedule( aio_step, 1) | |
#============ test program =================== | |
class tui: | |
out = sys.__stdout__.write | |
def __enter__(self): | |
self.out('\x1b7\x1b[?25l') | |
return self | |
def __exit__(self,*tb): | |
self.out('\x1b8\x1b[?25h') | |
def __call__(self,*a,**kw): | |
self.out( '\x1b[{};{}H{}'.format( kw.get('y',12), kw.get('x',40) , ' '.join(a) ) ) | |
tui.instance = tui() | |
sys.stdout.flush() | |
async def render_ui(): | |
import time | |
while True: | |
with tui.instance as print: | |
#draw a clock | |
print('%2d:%2d:%2d' % time.localtime()[3:6] , x=70, y=1 ) | |
await asyncio.sleep(1) | |
sys.stdout.flush() | |
async def __main__(*a,**k): | |
while True: | |
await asyncio.sleep(1) | |
async def test(): | |
await asyncio.sleep(1) | |
print('wait me !') | |
await asyncio.sleep(1) | |
return 666 | |
run(__main__, render_ui) | |
print("type aio.close() to halt asyncio background operations") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment