Skip to content

Instantly share code, notes, and snippets.

@william-ml-leslie
Created August 9, 2012 04:09
Show Gist options
  • Save william-ml-leslie/3300866 to your computer and use it in GitHub Desktop.
Save william-ml-leslie/3300866 to your computer and use it in GitHub Desktop.
App-level dataflow
NO_RETURN = set(dis.opmap[name] for name in """
JUMP_ABSOLUTE JUMP_FORWARD RAISE_VARARGS STOP_CODE RETURN_VALUE
BREAK_LOOP""".split())
def decode_code(offset, code):
opcode = ord(code[offset])
if opcode >= dis.HAVE_ARGUMENT:
oparg = ord(code[offset + 1]) + ord(code[offset + 2])
return opcode, oparg, offset + 3
return opcode, 0, offset + 1
def iterate_bytecode(code):
i = 0
while i < len(code):
offset = i
opcode, oparg, i = decode_code(i, code)
yield offset, opcode, oparg, i
def find_handlers(code):
handlers = []
for offset, opcode, oparg, ni in iterate_bytecode(code):
opname = dis.opname[opcode]
if opname.startswith('SETUP_'):
handlers.append((offset, ni + oparg, opname[6:]))
return handlers
def find_targets(code):
op_target = {}
for offset, opcode, oparg, ni in iterate_bytecode(code):
targets = op_target[offset] = []
if opcode not in NO_RETURN:
targets.append(i)
if opcode in des.hasjrel:
targets.append(ni + oparg)
if opcode in dis.hasjabs:
targets.append(oparg)
return op_target
def targets_top_order(targets, initial):
working = list(initial)
seen = set(initial)
while working:
source = working.pop()
# we do actually need these targets in order, to match the StackEffects
yield source, targets[source]
working.extend(t for t in targets[source] if t not in seen)
seen.update(targets[source])
def analyse(graph, code_obj):
code = code_obj.co_code
targets = find_targets(code)
# handlers should be added to the /initial/ set.
#
# we are also ignoring any block structure (static contexts)
# within the code here.
handlers = find_handlers(code)
op_stack = {} # before
op_res_stack = {} # after
op_source_stack = {} # union with op_stack
ofs_stack_size = {0 : 0}
stack = []
for offs, targets in targets_post_order(targets, [0]):
opcode, oparg, _ = decode_code(offs, code)
stack_size = ofs_stack_size[offs]
stack = [graph.aggregate() for _ in xrange(stack_size)]
op_stack[offs] = list(stack)
f = OP_FN[dis.opname[opcode]]
et = EffectContext(graph, code_obj)
stack_effect = f(et, stack, oparg)
for target in targets:
assert stack_effect is not None, ("There should be as many Targets"
" as there are StackEffects")
result_stack_size = (stack_size + stack_effect.return_arity -
stack_effect.slurp_arity)
if target in ofs_stack_size:
assert ofs_stack_size[target] == result_stack_size, (
"Inconsistent stack depth:"
" old=%d, new=%d, source=%d, target=%d" %
(ofs_stack_size[target], result_stack_size, offs, target))
else:
ofs_stack_size[target] = result_stack_size
target_source_stack = stack[:-stack_effect.slurp_arity]
target_source_stack.extend(
et.unpack(stack_effect.return_arity,
stack_effect.target_function,
frame, et.constant(oparg),
*stack_effect.argument_regions))
op_source_stack[target] = target_source_stack
stack_effect = stack_effect.next_effect
assert stack_effect is None, ("There should be as many StackEffects"
" as there are targets")
# we go through and tie the source of the valuestack with their
# usages. note that there is no context- or flow-sensitivity
# here.
for target, stack in op_stack.iteritems():
for source_stack in op_source_stack[target]:
for source, target in zip(source_stack, source):
graph.add(target, source)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment