Created
March 8, 2017 06:26
-
-
Save njsmith/0e776d148f1953ced98a1211de61f99a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ast | |
import types | |
import ctypes | |
# Really what we want, I think are compile modes "async_exec", "async_single", | |
# "async_eval". These would: | |
# | |
# - turn on await | |
# - turn on the coroutine flags, so that passing the code to exec() returns a | |
# coroutine object | |
# - but don't have CO_NEWLOCALS set | |
# | |
# i.e., basically return the same code object you'd get inside func.__code__ | |
# if you wrapped an async def around the given code, except without | |
# CO_NEWLOCALS. | |
def wrap_in_async_def(raw_cell): | |
new_code = ["async def WRAPPER():\n"] | |
for line in raw_cell.split("\n"): | |
new_code.append(" ") | |
new_code.append(line) | |
return "".join(new_code) | |
# From Include/code.h: | |
CO_NEWLOCALS = 0x0002 | |
class PyCodeObject(ctypes.Structure): | |
_fields_ = [ | |
("PyObject_HEAD", ctypes.c_byte * object().__sizeof__), | |
("co_argcount", ctypes.c_int), | |
("co_kwonlyargcount", ctypes.c_int), | |
("co_nlocals", ctypes.c_int), | |
("co_stacksize", ctypes.c_int), | |
("co_flags", ctypes.c_int), | |
# ... and so on, but we only care about co_flags | |
] | |
def set_co_flags(codeobj, co_flags): | |
if not isinstance(codeobj, types.CodeType): | |
raise TypeError("code objects only!") | |
raw = ctypes.cast(ctypes.c_void_p(id(codeobj)), | |
ctypes.POINTER(PyCodeObject)) | |
raw.co_flags = co_flags | |
async def run_cell(raw_cell, filename, globals_, locals_): | |
# Wrap in an async def, and get the AST | |
mod = compile(wrap_in_async_def(raw_cell), filename, "exec", | |
ast.PyCF_ONLY_AST) | |
# Then modify the AST so we can get the last value: | |
# | |
# raw_ast looks like | |
# | |
# Module( | |
# body=[ | |
# AsyncFunctionDef(..., | |
# body=[ | |
# Expr(...), | |
# AugAssign(...), | |
# ... etc. ... | |
# ]) | |
# ]) | |
# | |
# If the last item in the function body is an Expr, then we want to return | |
# it. (This is like ipython's default mode where only the last statement | |
# in a cell gets printed; not like the built-in REPL where all bare | |
# expressions get printed.) | |
last = mod.body[0].body[-1] | |
if isinstance(last, ast.Expr): | |
mod.body[0].body[-1] = ast.Return(value=last.value) | |
# Now create a code object representing the function body | |
ns = {} | |
exec(mod, "exec", ns) | |
codeobj = ns["WRAPPER"].__code__ | |
# Finally, the tricky part: we want to run async_fn in the context of the | |
# given globals and locals. Globals are easy, but functions normally get | |
# their own locals environment, which in the REPL context would be really | |
# annoying. Internally, this "new locals" behavior is determined by the | |
# aptly named CO_NEWLOCALS flag, which is automatically set on every | |
# function's code object. But we can fix that. | |
set_co_flags(codeobj.co_flags, codeobj.co_flags & ~CO_NEWLOCALS) | |
# And finally, run it! | |
sys.displayhook(await exec(codeobj, globals_, locals_) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment