Skip to content

Instantly share code, notes, and snippets.

@Bobbias
Created August 13, 2020 20:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Bobbias/9debc8e16de7a0ee9b4a09bdad584e89 to your computer and use it in GitHub Desktop.
Save Bobbias/9debc8e16de7a0ee9b4a09bdad584e89 to your computer and use it in GitHub Desktop.
import logging
from collections import deque
from copy import *
from enums import OpcodeEnum, AddressMode, BreakOn, opcode_length, opcode_to_fname
from helpers import pad, is_relevant, process_intcode_exception
if not __name__ == '__main__':
print('IntcodeComputer loading')
##############################################
# Computer
##############################################
class IntcodeComputer:
"""
todo: write docstring
"""
def __init__(self, program: list, computer_id: int = 0, break_on=BreakOn.OUTPUT):
self.id = computer_id
self.program = pad(deepcopy(program), 1024 * 1024 * 50, 0)
self.pc: int = 0
self.relative_base: int = 0
self.halted: bool = False
self.logger = logging.getLogger(f'Computer {computer_id}')
self.logger.setLevel('DEBUG')
self.started = False
self.break_on = break_on
self.is_break = False
self.logger.debug(f'break_on: {break_on.name}')
self.waiting_for_input = False
self.funcs = self._get_func_dict()
def get_params(self):
"""
:return:
"""
opcode: int = self.program[self.pc]
self.logger.debug(f'Opcode is: {opcode}')
op_enum: OpcodeEnum = OpcodeEnum(opcode % 100)
divisor: int = 100
num_params: int = (opcode_length[op_enum] - 1)
params: list = [0] * num_params
dest: int = 0
for i in range(opcode_length[op_enum] - 1):
mode: int = opcode // divisor % 10
value: int = self.program[self.pc + 1 + i]
if AddressMode(mode) == AddressMode.POSITION:
assert value < len(self.program), \
f'Attempt to access position {value} when program size is {len(self.program)} in computer {self.id}.'
params[i] = self.program[value]
dest: int = value # this is a dirty hack lifted from trevor's implementation.
# it assumes the destination is the last positional parameter.
# it overwrites itself and whatever the final value is gets used.
self.logger.info(f'Position params[{i}] = {self.program[value]}, dest = {dest}')
elif AddressMode(mode) == AddressMode.IMMEDIATE:
params[i] = value
self.logger.info(f'Immediate params[{i}] = {value}')
elif AddressMode(mode) == AddressMode.RELATIVE:
# assert False, 'Relative mode is not supported yet.'
params[i] = self.program[value + self.relative_base]
dest: int = value + self.relative_base
self.logger.info(f'Relative params[{i}] = {self.program[value + self.relative_base]}, '
f'value = {value}, relative base = {self.relative_base}, dest = {dest}')
divisor *= 10
return params, dest
def _log(self, opcode, fun, *args, **kwargs):
self.logger.info(f'[{fun.__name__}] {opcode.name} executed.')
return fun(self, *args, **kwargs)
# noinspection SpellCheckingInspection
def run(self, input_queue: deque):
"""
Takes a queue for input and output functionality, loops through the computer's program until it hits an
instruction which the computer must stop at temporarily. Must be called continually until computer.halted is
True.
:param input_queue: The queue to use for communication.
:return: none
"""
if not self.started:
self.started = True
self.logger.info(f'Beginning execution on computer {self.id}')
try:
while True:
self.logger.info('===============\n\t\tNEW INSTRUCTION')
opcode = self.program[self.pc]
self.logger.debug(f'Raw Opcode input: {opcode}, {opcode % 100}')
assert OpcodeEnum(
opcode % 100) in OpcodeEnum, f'Opcode {opcode % 100} at position {self.pc} in computer ' \
f'{self.id} is not a known opcode.'
parameters, dest = self.get_params()
opcode = OpcodeEnum(opcode % 100)
self.logger.info(f'PC: {self.pc}')
self._conditional_dispatch(opcode, self.funcs[opcode_to_fname[opcode]],
params=parameters,
io_queue=input_queue,
dest=dest)
# note: consider refactor that can remove those kwargs from above.
# note: this could involve wrapping those things in some kind of state
# note: containing object.
if self.is_break:
return
self.pc += opcode_length[opcode]
self.logger.debug(f'Increased PC by: {opcode_length[opcode]}')
except (IndexError, ValueError) as err:
process_intcode_exception(self.logger, self.pc, self.program, err)
def _hlt(self, **kwargs):
self.halted = True
##############################################
# Internal functions
##############################################
# takes the current opcode, and the function for that opcode.
# checks if we break on that opcode or not
# returns True when we are breaking
# runs the function at the correct time
# must take into account the fact that output breaks late, others break early
def _conditional_dispatch(self, opcode: OpcodeEnum, fun, *args, **kwargs):
op = BreakOn[opcode.name]
if op == BreakOn.OUTPUT and not self.is_break:
self.is_break = True
self._log(opcode, fun, *args, **kwargs)
return True
elif op == BreakOn.OUTPUT and self.is_break:
self.logger.info(f'Resuming from break on {op.name}')
self.is_break = False
return
elif self.break_on & op and not self.is_break:
self.logger.info(f'Breaking on {op.name}')
self.is_break = True
return True
else:
self.is_break = False
return self._log(opcode, fun, *args, **kwargs)
def _in(self, *, dest, io_queue, **kwargs):
input_value = io_queue.popleft()
self.logger.debug(f'IN program[{dest}] = {self.program[dest]} value = {input_value}')
self.program[dest] = input_value
def _rb(self, *, params, **kwargs):
self.relative_base += params[0]
assert self.relative_base > 0
self.logger.debug(f'RB relative base = {self.relative_base}')
def _eq(self, *, dest, params, **kwargs):
self.program[dest] = 1 if params[0] == params[1] else 0
self.logger.debug(f'EQ program[{dest}] = {self.program[dest]}')
def _lt(self, *, dest, params, **kwargs):
self.program[dest] = 1 if params[0] < params[1] else 0
self.logger.debug(f'LT program[{dest}] = {self.program[dest]}')
def _jz(self, *, params, **kwargs):
if not params[0]:
self.logger.debug(f'JZ PC = {params[1]}')
self.pc = params[1]
else:
self.logger.debug(f'JZ continue')
def _jnz(self, *, params, **kwargs):
if params[0]:
self.logger.debug(f'JNZ PC = {params[1]}')
self.pc = params[1]
else:
self.logger.debug(f'JNZ Continue')
def _out(self, *, io_queue, params, **kwargs):
io_queue.append(params[0])
def _mul(self, *, dest, params, **kwargs):
self.logger.info(f'MUL {params[0]} * {params[1]} = {self.program[dest]}')
self.program[dest] = params[0] * params[1]
def _add(self, *, dest, params, **kwargs):
self.logger.info(f'ADD {params[0]} + {params[1]} = {self.program[dest]}')
self.program[dest] = params[0] + params[1]
def _get_func_dict(self):
return {k: v for k, v in filter(is_relevant, self.__class__.__dict__.items())}
##############################################
# Main
##############################################
if __name__ == '__main__':
cmp = IntcodeComputer([99])
print(cmp._get_func_dict())
from enum import Enum, IntFlag, auto
class OpcodeEnum(Enum):
ADD = 1
MUL = 2
INPUT = 3
OUTPUT = 4
JNZ = 5
JZ = 6
LT = 7
EQ = 8
RB = 9
HALT = 99
class AddressMode(Enum):
POSITION = 0
IMMEDIATE = 1
RELATIVE = 2
class BreakOn(IntFlag):
ADD = auto()
MUL = auto()
INPUT = auto()
OUTPUT = auto()
JNZ = auto()
JZ = auto()
LT = auto()
EQ = auto()
RB = auto()
HALT = auto()
opcode_to_fname = {
OpcodeEnum.ADD: '_add',
OpcodeEnum.MUL: '_mul',
OpcodeEnum.INPUT: '_in',
OpcodeEnum.OUTPUT: '_out',
OpcodeEnum.JNZ: '_jnz',
OpcodeEnum.JZ: '_jz',
OpcodeEnum.LT: '_lt',
OpcodeEnum.EQ: '_eq',
OpcodeEnum.RB: '_rb',
OpcodeEnum.HALT: '_hlt',
}
opcode_length = {OpcodeEnum.ADD: 4,
OpcodeEnum.MUL: 4,
OpcodeEnum.LT: 4,
OpcodeEnum.EQ: 4,
OpcodeEnum.JNZ: 3,
OpcodeEnum.JZ: 3,
OpcodeEnum.INPUT: 2,
OpcodeEnum.OUTPUT: 2,
OpcodeEnum.RB: 2,
OpcodeEnum.HALT: 1}
import logging
import sys
from itertools import chain, repeat, islice
import re
import enums
def pad_infinite(iterable, padding=None):
return chain(iterable, repeat(padding))
def pad(iterable, size, padding=None):
return list(islice(pad_infinite(iterable, padding), size))
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
def setup_logger(name, log_file, level=logging.INFO):
"""To setup as many loggers as you want"""
handler = logging.FileHandler(log_file)
handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(handler)
return logger
def is_relevant(item):
if re.match(r'_(?!.*di|.*log|_).*', item[0]):
return True
else:
return False
def process_intcode_exception(logger, pc, program, err):
logger.exception(err)
logger.error(f'Exception occurred at {pc}')
window_size = 7
start = pc - window_size
end = pc + window_size if len(program) > pc + window_size else len(program)
logger.error(f'{start}:{end}')
logger.error(f'{program[start:end]}')
sys.exit(-1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment