Skip to content

Instantly share code, notes, and snippets.

@embray
Last active June 22, 2020 08:07
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save embray/f95d9cbe677542ce19a1 to your computer and use it in GitHub Desktop.
Save embray/f95d9cbe677542ce19a1 to your computer and use it in GitHub Desktop.
Capture the contents of a with statement code block without executing the block
import contextlib
import dis
import inspect
import linecache
import struct
import sys
class SkipBlock(Exception):
pass
_SETUP_WITH = dis.opmap['SETUP_WITH']
_STORE_FAST = dis.opmap['STORE_FAST']
_STORE_NAME = dis.opmap['STORE_NAME']
_UNPACK_SEQUENCE = dis.opmap['UNPACK_SEQUENCE']
@contextlib.contextmanager
def capture_block(capture_target=False):
"""
Context manager that returns the contents of the with statement block
without allowing the block to execute. Very hacky; use with caution.
Example
-------
>>> def example():
... print('Before with statement')
... with capture_block() as lines:
... a = 3
... b = 5
... print(a * b)
... print('Hello with statement')
... print('After with statement')
... print('With statement lines:')
... print(''.join(lines))
...
>>> example()
Before with statement
After with statement
With statement lines:
a = 3
b = 5
print(a * b)
print('Hello with statement')
<BLANKLINE>
When ``capture_target=True`` the names of the target variable(s) are
also returned. ``capture_block()`` yields two outputs--the lines of
the with statement, and a tuple of the names of the targets, if any.
Example
-------
>>> def example():
... with capture_block(capture_target=True) as (lines, target):
... pass
... print(''.join(lines))
... print(target)
>>> example()
pass
<BLANKLINE>
('lines', 'target')
"""
f = inspect.currentframe().f_back.f_back
start_lineno = f.f_lineno
old_trace = sys.gettrace()
code = f.f_code
bytecode = code.co_code
prev_offset = None
with_end = None
if isinstance(bytecode, str):
# Python 2
bytecode = [ord(c) for c in bytecode]
def getarg(offset):
arg, = struct.unpack('h', bytecode[offset + 1:offset + 3])
return arg
def check_opcode_range(begin, end):
# Checks the given range of opcode bytes for
# with statement startups
idx = begin
while idx < end:
opcode = bytecode[idx]
if opcode == _SETUP_WITH:
arg = getarg(idx)
if capture_target:
target = determine_target(idx + 3)
else:
target = None
return (arg + idx + 3, target)
if opcode >= dis.HAVE_ARGUMENT:
idx += 3
else:
idx += 1
return None
def determine_target(offset):
target = []
if bytecode[offset] == _STORE_FAST:
arg = getarg(offset)
target.append(code.co_varnames[arg])
elif bytecode[offset] == _STORE_NAME:
arg = getarg(offset)
target.append(code.co_names[arg])
elif bytecode[offset] == _UNPACK_SEQUENCE:
arg = getarg(offset)
for _ in range(arg):
offset += 3
target.extend(determine_target(offset))
return tuple(target)
for offset, lineno in dis.findlinestarts(f.f_code):
if lineno < start_lineno:
# Skip ahead for the start line (the current line of the frame)
continue
if prev_offset is None:
prev_offset = offset
continue
if with_end is None:
with_end, target = check_opcode_range(prev_offset, offset)
elif offset >= with_end:
end_lineno = lineno - 1
break
prev_offset = offset
lines = linecache.getlines(code.co_filename)
lines = lines[start_lineno:end_lineno]
sys.settrace(lambda *args, **kwargs: None)
def trace(*args):
raise SkipBlock
f.f_trace = trace
try:
if capture_target:
yield lines, target
else:
yield lines
except SkipBlock:
pass
except:
# Shouldn't happen???
raise
finally:
# Be kind
sys.settrace(old_trace)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment