Skip to content

Instantly share code, notes, and snippets.

@njsmith
Created March 8, 2017 06:26
Show Gist options
  • Save njsmith/0e776d148f1953ced98a1211de61f99a to your computer and use it in GitHub Desktop.
Save njsmith/0e776d148f1953ced98a1211de61f99a to your computer and use it in GitHub Desktop.
import ast
import types
import ctypes
# Really what we want, I think are compile modes "async_exec", "async_single",
# "async_eval". These would:
#
# - turn on await
# - turn on the coroutine flags, so that passing the code to exec() returns a
# coroutine object
# - but don't have CO_NEWLOCALS set
#
# i.e., basically return the same code object you'd get inside func.__code__
# if you wrapped an async def around the given code, except without
# CO_NEWLOCALS.
def wrap_in_async_def(raw_cell):
new_code = ["async def WRAPPER():\n"]
for line in raw_cell.split("\n"):
new_code.append(" ")
new_code.append(line)
return "".join(new_code)
# From Include/code.h:
CO_NEWLOCALS = 0x0002
class PyCodeObject(ctypes.Structure):
_fields_ = [
("PyObject_HEAD", ctypes.c_byte * object().__sizeof__),
("co_argcount", ctypes.c_int),
("co_kwonlyargcount", ctypes.c_int),
("co_nlocals", ctypes.c_int),
("co_stacksize", ctypes.c_int),
("co_flags", ctypes.c_int),
# ... and so on, but we only care about co_flags
]
def set_co_flags(codeobj, co_flags):
if not isinstance(codeobj, types.CodeType):
raise TypeError("code objects only!")
raw = ctypes.cast(ctypes.c_void_p(id(codeobj)),
ctypes.POINTER(PyCodeObject))
raw.co_flags = co_flags
async def run_cell(raw_cell, filename, globals_, locals_):
# Wrap in an async def, and get the AST
mod = compile(wrap_in_async_def(raw_cell), filename, "exec",
ast.PyCF_ONLY_AST)
# Then modify the AST so we can get the last value:
#
# raw_ast looks like
#
# Module(
# body=[
# AsyncFunctionDef(...,
# body=[
# Expr(...),
# AugAssign(...),
# ... etc. ...
# ])
# ])
#
# If the last item in the function body is an Expr, then we want to return
# it. (This is like ipython's default mode where only the last statement
# in a cell gets printed; not like the built-in REPL where all bare
# expressions get printed.)
last = mod.body[0].body[-1]
if isinstance(last, ast.Expr):
mod.body[0].body[-1] = ast.Return(value=last.value)
# Now create a code object representing the function body
ns = {}
exec(mod, "exec", ns)
codeobj = ns["WRAPPER"].__code__
# Finally, the tricky part: we want to run async_fn in the context of the
# given globals and locals. Globals are easy, but functions normally get
# their own locals environment, which in the REPL context would be really
# annoying. Internally, this "new locals" behavior is determined by the
# aptly named CO_NEWLOCALS flag, which is automatically set on every
# function's code object. But we can fix that.
set_co_flags(codeobj.co_flags, codeobj.co_flags & ~CO_NEWLOCALS)
# And finally, run it!
sys.displayhook(await exec(codeobj, globals_, locals_)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment