Skip to content

Instantly share code, notes, and snippets.

@pmp-p
Created November 10, 2018 15:18
Show Gist options
  • Save pmp-p/35ef4d32a11aae18208a8f30c5a26d20 to your computer and use it in GitHub Desktop.
Save pmp-p/35ef4d32a11aae18208a8f30c5a26d20 to your computer and use it in GitHub Desktop.
minimalist async repl ( with a clock ! )
# === utilities ===================
import builtins
builtins.__print__ = print
def pdb(*a,**k):
k.setdefault( 'file', sys.__stderr__ )
return __print__(*a,**k)
async def coroutine():pass
coroutine = coroutine()
#==== display coroutines values ================
#1: coroutine()
#2: x=coroutine();x
#3: x=await coroutine()
#1/2/3
import sys
import asyncio
import builtins
#3
import readline
import ast
import textwrap
stdwrite = sys.__stdout__.write
async def await_displayhook(value):
try:
stdwrite( f':async: {await value}\n~~> ')
except RuntimeError as e:
stdwrite(f'{e.__class__.__name__}: {str(e)}\n~~> ')
except Exception as e:
pdb(':',e,e.__name__)
finally:
sys.ps1 = builtins.__ps1__
# 1 & 2
def displayhook(value):
if value is None:
return
builtins._ = None
text = repr(value)
if isinstance(value,coroutine.__class__):
builtins.__ps1__ = sys.ps1
sys.ps1 = ""
import asyncio
stdwrite(f":async: awaiting {text}")
asyncio.get_event_loop().create_task( await_displayhook(value) )
else:
try:
stdwrite(text)
except UnicodeEncodeError:
bytes = text.encode(sys.stdout.encoding, 'backslashreplace')
if hasattr(sys.stdout, 'buffer'):
sys.stdout.buffer.write(bytes)
else:
text = bytes.decode(sys.stdout.encoding, 'strict')
stdwrite(text)
stdwrite("\n")
builtins._ = value
sys.displayhook = displayhook
# 3
async def retry(index):
try:
for x in range(10):
code = readline.get_history_item(index)
if code and code.count('await'):
stdwrite(f'{code}"\n')
code = textwrap.indent(code,' '*4 )
code = f"""
#==========================================
import builtins
async def retry_async_body():
__snapshot = list( locals().keys() )
{code}
maybe_new = list( locals().keys() )
while len(__snapshot):
try:maybe_new.remove( __snapshot.pop() )
except:pass
maybe_new.remove('__snapshot')
print('_'*30)
while len(maybe_new):
new_one = maybe_new.pop(0)
print(new_one , ':=', locals()[new_one])
globals()[new_one] = locals()[new_one]
#==========================================
"""
#stdwrite(f'\n{code}~~> ')
bytecode = compile(code, '<asyncify>', 'exec')
exec(bytecode, globals(), globals())
await retry_async_body()
return stdwrite('~~> ')
await asyncio.sleep(.001)
stdwrite(f':async: code vanished\n~~> ')
finally:
sys.ps1 = builtins.__ps1__
def excepthook(etype, e, tb):
if isinstance(e, SyntaxError) and e.filename == '<stdin>':
index = readline.get_current_history_length()
builtins.__ps1__ = sys.ps1
sys.ps1 = ""
stdwrite(f':async: asyncify "')
asyncio.get_event_loop().create_task( retry(index) )
#discard trace
return
sys.__excepthook__(etype, e, tb)
sys.excepthook = excepthook
#======== have asyncio loop runs with interleaved with repl
import sys
import builtins
if not sys.flags.inspect:
print("Error: interpreter must be run with -i or PYTHONINSPECT must be set")
raise SystemExit
def schedule(fn, a):
if getattr(builtins, "scheduled", None) is None:
#! KEEP IT WOULD BE GC OTHERWISE!
# builtins.wrapper_ref
builtins.scheduled = []
from ctypes import pythonapi, cast, c_char_p, c_void_p, CFUNCTYPE
HOOKFUNC = CFUNCTYPE(c_char_p, c_void_p, c_void_p, c_char_p)
PyOS_InputHookFunctionPointer = c_void_p.in_dll(pythonapi, "PyOS_InputHook")
def scheduler(*a, **k):
# prevent reenter
lq = len(builtins.scheduled)
while lq:
fn, a = builtins.scheduled.pop(0)
fn(a)
lq -= 1
builtins.scheduler = scheduler
builtins.wrapper_ref = HOOKFUNC(builtins.scheduler)
PyOS_InputHookFunctionPointer.value = cast(builtins.wrapper_ref, c_void_p).value
def schedule(fn, a):
builtins.scheduled.append((fn, a))
__import__(__name__).schedule = schedule
del schedule
__import__(__name__).schedule(fn,a)
# ========== asyncio stepping ================
def aio_step(arg):
global aio
if aio.is_closed():
sys.__stdout__.write(f'\n:async: stopped\n{sys.ps1}')
return
aio.call_soon(aio.stop)
aio.run_forever()
if arg:
schedule( aio_step, arg)
def run(*entrypoints):
global aio
aio = asyncio.get_event_loop()
for entrypoint in entrypoints:
aio.create_task( entrypoint() )
schedule( aio_step, 1)
#============ test program ===================
class tui:
out = sys.__stdout__.write
def __enter__(self):
self.out('\x1b7\x1b[?25l')
return self
def __exit__(self,*tb):
self.out('\x1b8\x1b[?25h')
def __call__(self,*a,**kw):
self.out( '\x1b[{};{}H{}'.format( kw.get('y',12), kw.get('x',40) , ' '.join(a) ) )
tui.instance = tui()
sys.stdout.flush()
async def render_ui():
import time
while True:
with tui.instance as print:
#draw a clock
print('%2d:%2d:%2d' % time.localtime()[3:6] , x=70, y=1 )
await asyncio.sleep(1)
sys.stdout.flush()
async def __main__(*a,**k):
while True:
await asyncio.sleep(1)
async def test():
await asyncio.sleep(1)
print('wait me !')
await asyncio.sleep(1)
return 666
run(__main__, render_ui)
print("type aio.close() to halt asyncio background operations")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment