Skip to content

Instantly share code, notes, and snippets.

@mikejs
Created April 14, 2011 00:25
Show Gist options
  • Save mikejs/918701 to your computer and use it in GitHub Desktop.
Save mikejs/918701 to your computer and use it in GitHub Desktop.
import os
try:
from pypy.rlib.jit import JitDriver, purefunction, hint
except ImportError:
# If pypy isn't importable then (presumably) we're not being translated,
# so make JIT stuff no-ops.
class JitDriver(object):
def __init__(self, **kwargs):
pass
def jit_merge_point(self, **kwargs):
pass
def purefunction(f):
return f
def hint(first, *args, **kwargs):
return first
def get_printable_location(pc, opcodes, labels):
"""
Returns a string representing the current state of the interpreter,
for use in JIT trace logs.
"""
opcode = opcodes[pc]
return "%d:(%d, %d, '%s')" % (pc, opcode[0], opcode[1], opcode[2])
jitdriver = JitDriver(greens=['pc', 'opcodes', 'labels'],
reds=['stack', 'heap', 'ret_stack'],
get_printable_location=get_printable_location)
(OP_NOP, OP_PUSH, OP_DUP, OP_COPY, OP_SWAP, OP_DISCARD, OP_SLIDE,
OP_ADD, OP_SUB, OP_MUL, OP_DIV, OP_MOD, OP_STORE, OP_RETR, OP_MARK,
OP_CALL, OP_JMP, OP_JMPZ, OP_JMPN, OP_END, OP_END_PROG, OP_OUT_CHAR,
OP_OUT_NUM, OP_READ_CHAR, OP_READ_NUM) = range(25)
class Stack(object):
def __init__(self, size=100000):
self._stack = [0] * size
self._size = size
self._pos = 0
def push(self, num):
pos = self._pos
assert pos < self._size
self._stack[pos] = num
self._pos = pos + 1
def pop(self):
top = self._pos - 1
assert top >= 0
self._pos = top
return self._stack[top]
def swap(self):
top = self._pos - 1
second = top - 1
assert second >= 0
temp = self._stack[top]
self._stack[top] = self._stack[second]
self._stack[second] = temp
def dup(self):
top = self._pos - 1
assert top >= 0
self.push(self._stack[top])
@purefunction
def get_matching_label(labels, label):
return labels[label]
def interpret(opcodes, labels):
pc = 0
stack = Stack()
ret_stack = []
heap = {}
while pc < len(opcodes):
jitdriver.jit_merge_point(pc=pc, stack=stack, opcodes=opcodes,
labels=labels, heap=heap,
ret_stack=ret_stack)
code = opcodes[pc]
op = code[0]
stack._pos = hint(stack._pos, promote=True)
if op == OP_PUSH:
stack.push(code[1])
elif op == OP_DUP:
stack.dup()
elif op == OP_SWAP:
stack.swap()
elif op == OP_DISCARD:
stack.pop()
elif op == OP_ADD:
stack.push(stack.pop() + stack.pop())
elif op == OP_SUB:
right = stack.pop()
left = stack.pop()
stack.push(left - right)
elif op == OP_MUL:
stack.push(stack.pop() * stack.pop())
elif op == OP_DIV:
right = stack.pop()
left = stack.pop()
stack.push(left / right)
elif op == OP_MOD:
right = stack.pop()
left = stack.pop()
stack.push(left % right)
elif op == OP_JMP:
pc = get_matching_label(labels, code[2])
elif op == OP_JMPZ:
if stack.pop() == 0:
pc = get_matching_label(labels, code[2])
elif op == OP_JMPN:
if stack.pop() < 0:
pc = get_matching_label(labels, code[2])
elif op == OP_STORE:
value = stack.pop()
addr = stack.pop()
heap[addr] = value
elif op == OP_RETR:
addr = stack.pop()
stack.push(heap[addr])
elif op == OP_OUT_NUM:
num = "%d" % stack.pop()
os.write(1, num)
elif op == OP_OUT_CHAR:
os.write(1, chr(stack.pop()))
elif op == OP_READ_NUM:
addr = stack.pop()
ch = os.read(0, 1)[0]
num = ''
while ch >= '0' and ch <= '9':
num += ch
ch = os.read(0, 1)[0]
heap[addr] = int(num)
elif op == OP_READ_CHAR:
addr = stack.pop()
heap[addr] = ord(os.read(0, 1)[0])
elif op == OP_MARK or op == OP_NOP:
pass
elif op == OP_CALL:
ret_stack.append(pc)
pc = get_matching_label(labels, code[2])
elif op == OP_END:
pc = ret_stack.pop()
elif op == OP_END_PROG:
return
else:
print "BAD OP: %d" % op
pc += 1
class CodeIter(object):
"""
Iterate over whitespace operators, skipping non-whitespace characters.
"""
def __init__(self, code):
self._iter = iter(code)
def __iter__(self):
return self
def next(self):
while True:
c = self._iter.next()
if c == '\t' or c == ' ' or c == '\n':
return c
def parse(code_str):
opcodes = []
labels = {}
code = CodeIter(code_str)
while True:
try:
tok = code.next()
except StopIteration:
return opcodes, labels
if tok == ' ':
opcodes.append(parse_stack(code))
elif tok == '\t':
sub = code.next()
if sub == ' ':
opcodes.append(parse_arithmetic(code))
elif sub == '\t':
opcodes.append(parse_heap(code))
elif sub == '\n':
opcodes.append(parse_io(code))
elif tok == '\n':
flow_op = parse_flow(code)
if flow_op[0] == OP_MARK:
labels[flow_op[2]] = len(opcodes)
opcodes.append(flow_op)
def parse_number(code):
sign = 1 if code.next() == ' ' else -1
num = 0
tok = code.next()
while tok != '\n':
if tok == ' ':
num = num * 2
elif tok == '\t':
num = num * 2 + 1
tok = code.next()
return num * sign
def parse_stack(code):
tok = code.next()
if tok == ' ':
return (OP_PUSH, parse_number(code), None)
elif tok == '\n':
sub = code.next()
if sub == ' ':
return (OP_DUP, 0, None)
elif sub == '\t':
return (OP_SWAP, 0, None)
elif sub == '\n':
return (OP_DISCARD, 0, None)
elif tok == '\t':
sub2 = code.next()
if sub2 == ' ':
return (OP_COPY, parse_number(code), None)
elif sub2 == '\n':
return (OP_SLIDE, parse_number(code), None)
return (OP_NOP, 0, None)
def parse_arithmetic(code):
first = code.next()
second = code.next()
if first == ' ':
if second == ' ':
return (OP_ADD, 0, None)
elif second == '\t':
return (OP_SUB, 0, None)
elif second == '\n':
return (OP_MUL, 0, None)
elif first == '\t':
if second == ' ':
return (OP_DIV, 0, None)
elif second == '\t':
return (OP_MOD, 0, None)
return (OP_NOP, 0, None)
def parse_heap(code):
tok = code.next()
if tok == ' ':
return (OP_STORE, 0, None)
elif tok == '\t':
return (OP_RETR, 0, None)
return (OP_NOP, 0, None)
def parse_label(code):
label = []
tok = code.next()
while tok != '\n':
label.append(tok)
tok = code.next()
return ''.join(label)
def parse_flow(code):
tok = code.next()
if tok == ' ':
sub = code.next()
label = parse_label(code)
if sub == ' ':
return (OP_MARK, 0, label)
elif sub == '\t':
return (OP_CALL, 0, label)
elif sub == '\n':
return (OP_JMP, 0, label)
elif tok == '\t':
sub2 = code.next()
if sub2 == ' ':
return (OP_JMPZ, 0, parse_label(code))
elif sub2 == '\t':
return (OP_JMPN, 0, parse_label(code))
elif sub2 == '\n':
return (OP_END, 0, None)
elif tok == '\n':
if code.next() == '\n':
return (OP_END_PROG, 0, None)
return (OP_NOP, 0, None)
def parse_io(code):
first = code.next()
second = code.next()
if first == ' ':
if second == ' ':
return (OP_OUT_CHAR, 0, None)
elif second == '\t':
return (OP_OUT_NUM, 0, None)
elif first == '\t':
if second == ' ':
return (OP_READ_CHAR, 0, None)
elif second == '\t':
return (OP_READ_NUM, 0, None)
return (OP_NOP, 0, None)
def run(fp):
program_contents = ""
while True:
read = os.read(fp, 4096)
if len(read) == 0:
break
program_contents += read
os.close(fp)
opcodes, labels = parse(program_contents)
interpret(opcodes, labels)
def entry_point(argv):
try:
filename = argv[1]
except IndexError:
print "usage: nbspy.py <filename>"
return 1
run(os.open(filename, os.O_RDONLY, 0777))
return 0
def target(*args):
return entry_point, None
def jitpolicy(driver):
from pypy.jit.codewriter.policy import JitPolicy
return JitPolicy()
if __name__ == '__main__':
import sys
entry_point(sys.argv)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment