Skip to content

Instantly share code, notes, and snippets.

@pmp-p
Last active October 16, 2018 13:57
Show Gist options
  • Save pmp-p/c5bff782e9f2452c89c85d0ac8fc2ba3 to your computer and use it in GitHub Desktop.
Save pmp-p/c5bff782e9f2452c89c85d0ac8fc2ba3 to your computer and use it in GitHub Desktop.
asyncio repl clock with PyOS_InputHook
#!/usr/local/bin/python3.7 -i
# ^-------------- ****** N.B. USE THE -i parameter ******
import builtins
import asyncio
#handy modules you can have preloaded on repl
import os,sys,time
def schedule(fn, a):
# emulate "schedule" function https://docs.micropython.org/en/latest/library/micropython.html?highlight=schedule#micropython.schedule
# Schedule the function func to be executed “very soon”.
# The function is passed the value arg as its single argument.
# “Very soon” means that the MicroPython runtime will do its best to execute the function at the earliest possible time,
# given that it is also trying to be efficient, and that the following conditions hold:
# A scheduled function will never preempt another scheduled function.
# Scheduled functions are always executed “between opcodes” which means that all fundamental Python operations (such as appending to a list) are guaranteed to be atomic.
# A given port may define “critical regions” within which scheduled functions will never be executed.
# Functions may be scheduled within a critical region but they will not be executed until that region is exited.
# An example of a critical region is a preempting interrupt handler (an IRQ).
# thanks to :
# https://github.com/Drekin/win-unicode-console/blob/master/win_unicode_console/readline_hook.py
#
# FIXME:
# maybe set freq here https://github.com/python/cpython/blob/97cf0828727ac2a269c89c5aa09570a69a22c83c/Modules/readline.c#L1192
# Modules/readline.c#L1192 struct timeval timeout = {0, 100000}; /* 0.1 seconds */
# => 0.016 would be a match for javascript requestAnimationFrame
# https://developer.mozilla.org/en-US/docs/Web/API/window/requestAnimationFrame
if getattr(builtins, "scheduled", None) is None:
#! KEEP IT WOULD BE GC OTHERWISE!
global wrapper_ref
builtins.scheduled = []
import ctypes.util
from ctypes import pythonapi, cdll, cast, c_char_p, c_void_p, c_size_t, CFUNCTYPE
LIBC = cdll[ctypes.util.find_library("c")]
HOOKFUNC = CFUNCTYPE(c_char_p, c_void_p, c_void_p, c_char_p)
PyOS_InputHookFunctionPointer = c_void_p.in_dll(pythonapi, "PyOS_InputHook")
def scheduler(*a, **k):
# prevent reenter
lq = len(builtins.scheduled)
while lq:
fn, a = builtins.scheduled.pop(0)
fn(a)
lq -= 1
builtins.scheduler = scheduler
wrapper_ref = HOOKFUNC(builtins.scheduler)
PyOS_InputHookFunctionPointer.value = cast(wrapper_ref, c_void_p).value
builtins.scheduled.append((fn, a))
count = 0
def do_loop(*a):
loop = asyncio.get_event_loop()
loop.call_soon(loop.stop)
loop.run_forever()
schedule(do_loop,1)
async def async_clock(t=1):
global CLK
import crt
while True:
crt.set_text(CLK, )
await uasyncio.sleep(t)
class Task:
def __init__(self, io):
self.io = io
asyncio.get_event_loop().create_task(self.runner())
async def runner(self):
while True:
with self.io:
self.run()
await asyncio.sleep(self.io.timeframe)
def run():pass
import sys
class render:
out = sys.stdout.write
def __init__(self,framerate=60):
self.timeframe = 1.0/framerate # 0.016
self.out( '\x1b[H\x1b[2J' )
def __enter__(self):
self.out('\x1b7\x1b[?25l')
return self
def __exit__(self,*tb):
self.out('\x1b8\x1b[?25h')
sys.stdout.flush()
def __call__(self,*a,**kw):
self.out( '\x1b[{};{}H{}'.format( kw.get('y',12), kw.get('x',40) , ' '.join(a) ) )
class Clock(Task):
def run(self):
import time
self.io('%2d:%2d:%2d' % time.localtime()[3:6] , x=70, y=1 )
clockwork = Clock( render(framerate = 60) )
schedule( do_loop, 1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment