Skip to content

Instantly share code, notes, and snippets.

@msg555
Last active December 8, 2019 07:46
Show Gist options
  • Save msg555/ad15d6a6ac0a0aa1461eec8cd52a168d to your computer and use it in GitHub Desktop.
Save msg555/ad15d6a6ac0a0aa1461eec8cd52a168d to your computer and use it in GitHub Desktop.
Solution to day 7
import abc
import asyncio
import itertools
import sys
class Instruction(metaclass=abc.ABCMeta):
PARAMS = 0
def __init__(self, comp, offset, mode):
self.comp = comp
self.offset = offset
mode = str(mode)[::-1]
self.mode = mode + "0" * (self.PARAMS - len(mode))
if self.offset + self.PARAMS > len(comp.code):
raise RuntimeError("Instruction leaves memory")
def get_arg(self, index):
if index < 0 or index >= self.PARAMS:
raise RuntimeError("Invalid argument access")
val = self.comp.code[self.offset + index]
if self.mode[index] == "0":
if val < 0 or val >= len(self.comp.code):
raise RuntimeError("Reference outside of memory")
val = self.comp.code[val]
return val
def set_arg(self, index, val):
if index < 0 or index >= self.PARAMS:
raise RuntimeError("Invalid argument access")
ind = self.offset + index
if self.mode[index] == "0":
ind = self.comp.code[ind]
if ind < 0 or ind >= len(self.comp.code):
raise RuntimeError("Reference outside of memory")
self.comp.code[ind] = val
@abc.abstractmethod
async def run(self):
pass
def _make_op(params, run):
class OpDyn(Instruction):
PARAMS = params
async def run(self):
return run(self)
return OpDyn
OpHalt = _make_op(0, lambda _: True)
OpAdd = _make_op(3, lambda ins: ins.set_arg(2, ins.get_arg(0) + ins.get_arg(1)))
OpMul = _make_op(3, lambda ins: ins.set_arg(2, ins.get_arg(0) * ins.get_arg(1)))
OpJIT = _make_op(2, lambda ins: ins.get_arg(1) if ins.get_arg(0) != 0 else None)
OpJIF = _make_op(2, lambda ins: ins.get_arg(1) if ins.get_arg(0) == 0 else None)
OpLT = _make_op(3, lambda ins: ins.set_arg(2, 1 if ins.get_arg(0) < ins.get_arg(1) else 0))
OpEQ = _make_op(3, lambda ins: ins.set_arg(2, 1 if ins.get_arg(0) == ins.get_arg(1) else 0))
class OpRead(Instruction):
PARAMS = 1
async def run(self):
self.set_arg(0, await self.comp.read())
class OpWrite(Instruction):
PARAMS = 1
async def run(self):
await self.comp.write(self.get_arg(0))
class Computer:
OPCODES = {
1: OpAdd,
2: OpMul,
3: OpRead,
4: OpWrite,
5: OpJIT,
6: OpJIF,
7: OpLT,
8: OpEQ,
99: OpHalt,
}
def __init__(self, code, input_queue, output_queue):
self.code = list(code)
self.input_queue = input_queue
self.output_queue = output_queue
async def run(self, ip=0):
while True:
if ip < 0 or ip >= len(self.code):
raise RuntimeError("IP left memory")
op = self.OPCODES[self.code[ip] % 100](self, ip + 1, self.code[ip] // 100)
result = await op.run()
if result is None:
ip += 1 + op.PARAMS
elif result is True:
break
else:
ip = result
async def read(self):
res = await self.input_queue.get()
return res
async def write(self, val):
await self.output_queue.put(val)
def main():
code = [int(x) for x in sys.stdin.readline().strip().split(",")]
result = 0
for pi in itertools.permutations(range(5, 10)):
input_streams = [asyncio.Queue() for _ in pi]
for i, input_stream in zip(pi, input_streams):
input_stream.put_nowait(i)
input_streams[0].put_nowait(0)
comps = [
Computer(
code, input_stream, input_streams[(ind + 1) % 5],
) for ind, (i, input_stream) in enumerate(zip(pi, input_streams))
]
asyncio.get_event_loop().run_until_complete(
asyncio.gather(*(
comp.run() for comp in comps
))
)
result = max(result, input_streams[0].get_nowait())
print(result)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment