Solution to day 7
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
import asyncio | |
import itertools | |
import sys | |
class Instruction(metaclass=abc.ABCMeta): | |
PARAMS = 0 | |
def __init__(self, comp, offset, mode): | |
self.comp = comp | |
self.offset = offset | |
mode = str(mode)[::-1] | |
self.mode = mode + "0" * (self.PARAMS - len(mode)) | |
if self.offset + self.PARAMS > len(comp.code): | |
raise RuntimeError("Instruction leaves memory") | |
def get_arg(self, index): | |
if index < 0 or index >= self.PARAMS: | |
raise RuntimeError("Invalid argument access") | |
val = self.comp.code[self.offset + index] | |
if self.mode[index] == "0": | |
if val < 0 or val >= len(self.comp.code): | |
raise RuntimeError("Reference outside of memory") | |
val = self.comp.code[val] | |
return val | |
def set_arg(self, index, val): | |
if index < 0 or index >= self.PARAMS: | |
raise RuntimeError("Invalid argument access") | |
ind = self.offset + index | |
if self.mode[index] == "0": | |
ind = self.comp.code[ind] | |
if ind < 0 or ind >= len(self.comp.code): | |
raise RuntimeError("Reference outside of memory") | |
self.comp.code[ind] = val | |
@abc.abstractmethod | |
async def run(self): | |
pass | |
def _make_op(params, run): | |
class OpDyn(Instruction): | |
PARAMS = params | |
async def run(self): | |
return run(self) | |
return OpDyn | |
OpHalt = _make_op(0, lambda _: True) | |
OpAdd = _make_op(3, lambda ins: ins.set_arg(2, ins.get_arg(0) + ins.get_arg(1))) | |
OpMul = _make_op(3, lambda ins: ins.set_arg(2, ins.get_arg(0) * ins.get_arg(1))) | |
OpJIT = _make_op(2, lambda ins: ins.get_arg(1) if ins.get_arg(0) != 0 else None) | |
OpJIF = _make_op(2, lambda ins: ins.get_arg(1) if ins.get_arg(0) == 0 else None) | |
OpLT = _make_op(3, lambda ins: ins.set_arg(2, 1 if ins.get_arg(0) < ins.get_arg(1) else 0)) | |
OpEQ = _make_op(3, lambda ins: ins.set_arg(2, 1 if ins.get_arg(0) == ins.get_arg(1) else 0)) | |
class OpRead(Instruction): | |
PARAMS = 1 | |
async def run(self): | |
self.set_arg(0, await self.comp.read()) | |
class OpWrite(Instruction): | |
PARAMS = 1 | |
async def run(self): | |
await self.comp.write(self.get_arg(0)) | |
class Computer: | |
OPCODES = { | |
1: OpAdd, | |
2: OpMul, | |
3: OpRead, | |
4: OpWrite, | |
5: OpJIT, | |
6: OpJIF, | |
7: OpLT, | |
8: OpEQ, | |
99: OpHalt, | |
} | |
def __init__(self, code, input_queue, output_queue): | |
self.code = list(code) | |
self.input_queue = input_queue | |
self.output_queue = output_queue | |
async def run(self, ip=0): | |
while True: | |
if ip < 0 or ip >= len(self.code): | |
raise RuntimeError("IP left memory") | |
op = self.OPCODES[self.code[ip] % 100](self, ip + 1, self.code[ip] // 100) | |
result = await op.run() | |
if result is None: | |
ip += 1 + op.PARAMS | |
elif result is True: | |
break | |
else: | |
ip = result | |
async def read(self): | |
res = await self.input_queue.get() | |
return res | |
async def write(self, val): | |
await self.output_queue.put(val) | |
def main(): | |
code = [int(x) for x in sys.stdin.readline().strip().split(",")] | |
result = 0 | |
for pi in itertools.permutations(range(5, 10)): | |
input_streams = [asyncio.Queue() for _ in pi] | |
for i, input_stream in zip(pi, input_streams): | |
input_stream.put_nowait(i) | |
input_streams[0].put_nowait(0) | |
comps = [ | |
Computer( | |
code, input_stream, input_streams[(ind + 1) % 5], | |
) for ind, (i, input_stream) in enumerate(zip(pi, input_streams)) | |
] | |
asyncio.get_event_loop().run_until_complete( | |
asyncio.gather(*( | |
comp.run() for comp in comps | |
)) | |
) | |
result = max(result, input_streams[0].get_nowait()) | |
print(result) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment