Last active
December 11, 2019 08:08
-
-
Save e-nomem/6361bba17eeaa9d799f370ede7484b62 to your computer and use it in GitHub Desktop.
Advent of Code 2019
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import IntEnum | |
from typing import Optional | |
from .io import Reader | |
from .io import stdin | |
from .io import stdout | |
from .io import Writer | |
from .utils import Program | |
class ParameterMode(IntEnum): | |
Position = 0 | |
Immediate = 1 | |
Relative = 2 | |
class Opcode(IntEnum): | |
Add = 1 | |
Multiply = 2 | |
Input = 3 | |
Output = 4 | |
JmpIfTrue = 5 | |
JmpIfFalse = 6 | |
LessThan = 7 | |
Equals = 8 | |
SetRelBase = 9 | |
Halt = 99 | |
async def run(prog: Program, reader: Reader = stdin, writer: Writer = stdout) -> int: | |
""" | |
Runs the provided program. The program will be mutated, so ensure that a copy is passed in if | |
it should be reused/rerun multiple times | |
""" | |
def opcode() -> Opcode: | |
code = prog[ptr] % 100 | |
try: | |
return Opcode(code) | |
except ValueError: | |
raise RuntimeError(f"Unknown opcode '{code}' at index {ptr} ({prog[ptr]})") from None | |
def get_mode(param: int) -> ParameterMode: | |
mode = (prog[ptr] // (10 ** (param + 1))) % 10 | |
try: | |
return ParameterMode(mode) | |
except ValueError: | |
raise RuntimeError(f"Unknown parameter mode '{mode}' at index {ptr} ({prog[ptr]})") from None | |
def read(param: int) -> int: | |
if param < 1: | |
raise RuntimeError(f'Parameter index cannot be less than 1. {param} at index {ptr} ({prog[ptr]})') | |
mode = get_mode(param) | |
idx = -1 | |
if mode == ParameterMode.Position: | |
idx = prog[ptr + param] | |
elif mode == ParameterMode.Immediate: | |
idx = ptr + param | |
elif mode == ParameterMode.Relative: | |
idx = rel_base + prog[ptr + param] | |
else: | |
raise NotImplementedError(f"Parameter mode '{mode.name}' ({mode.value}) not yet implemented") | |
if idx < 0: | |
raise RuntimeError(f'Cannot read from negative pointer {idx} at index {ptr} ({prog[ptr]})') | |
return prog[idx] | |
def write(param: int, value: int) -> None: | |
if param < 1: | |
raise RuntimeError(f'Parameter index cannot be less than 1. {param} at index {ptr} ({prog[ptr]})') | |
mode = get_mode(param) | |
idx = -1 | |
if mode == ParameterMode.Position: | |
idx = prog[ptr + param] | |
elif mode == ParameterMode.Relative: | |
idx = rel_base + prog[ptr + param] | |
else: | |
raise NotImplementedError(f'Output parameter in mode {mode.name} ({mode.value}) is not supported') | |
if idx < 0: | |
raise RuntimeError(f'Cannot write to negative pointer {idx} at index {ptr} ({prog[ptr]})') | |
prog[idx] = value | |
ptr = 0 | |
rel_base = 0 | |
read_iter = reader() | |
# Main program loop starts here | |
while True: | |
if ptr < 0: | |
raise RuntimeError(f'Cannot run with negative pointer {ptr}') | |
instr = opcode() | |
if instr == Opcode.Add: | |
write(3, read(1) + read(2)) | |
ptr += 4 | |
elif instr == Opcode.Multiply: | |
write(3, read(1) * read(2)) | |
ptr += 4 | |
elif instr == Opcode.Input: | |
val: Optional[int] = None | |
async for val in read_iter: | |
# Read one value from input and break from the async generator | |
write(1, val) | |
break | |
if val is None: | |
raise RuntimeError('Input exhausted') | |
ptr += 2 | |
elif instr == Opcode.Output: | |
# Write is async because it may need to acquire a lock | |
await writer(read(1)) | |
ptr += 2 | |
elif instr == Opcode.JmpIfTrue: | |
if read(1): | |
ptr = read(2) | |
else: | |
ptr += 3 | |
elif instr == Opcode.JmpIfFalse: | |
if not read(1): | |
ptr = read(2) | |
else: | |
ptr += 3 | |
elif instr == Opcode.LessThan: | |
val = 0 | |
if read(1) < read(2): | |
val = 1 | |
write(3, val) | |
ptr += 4 | |
elif instr == Opcode.Equals: | |
val = 0 | |
if read(1) == read(2): | |
val = 1 | |
write(3, val) | |
ptr += 4 | |
elif instr == Opcode.SetRelBase: | |
rel_base += read(1) | |
ptr += 2 | |
elif instr == Opcode.Halt: | |
return ptr | |
else: | |
raise NotImplementedError(f"Opcode '{instr.name}' ({instr.value}) not yet implemented") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from collections import deque | |
from typing import AsyncGenerator | |
from typing import Awaitable | |
from typing import Callable | |
from typing import Tuple | |
Reader = Callable[[], AsyncGenerator[int, None]] | |
Writer = Callable[[int], Awaitable[None]] | |
async def stdin() -> AsyncGenerator[int, None]: | |
""" | |
Default Reader implementation that prompts for input on STDIN | |
""" | |
while True: | |
yield int(input('(input)> ')) | |
async def stdout(val: int) -> None: | |
""" | |
Default Writer implementation that prints to STDOUT | |
""" | |
print(f'(output)> {val}') | |
def repeat_input(val: int) -> Reader: | |
""" | |
Creates a reader that will repeat the same input | |
""" | |
async def _helper() -> AsyncGenerator[int, None]: | |
while True: | |
yield val | |
return _helper | |
def static_input(*init: int) -> Reader: | |
""" | |
Creates a Reader that will return static data | |
""" | |
data = list(init) | |
data.reverse() | |
async def _helper() -> AsyncGenerator[int, None]: | |
while data: | |
yield data.pop() | |
return _helper | |
def input_concat(*readers: Reader) -> Reader: | |
""" | |
Takes in multiple Readers and returns a Reader that will read from them sequentially | |
""" | |
async def _helper() -> AsyncGenerator[int, None]: | |
for reader in readers: | |
read_iter = reader() | |
async for val in read_iter: | |
yield val | |
return _helper | |
def pipe(*init: int) -> Tuple[Reader, Writer]: | |
""" | |
Provides a Reader/Writer pair that are connected to each other so that multiple programs | |
can be connected together | |
""" | |
buffer = deque(list(init)) | |
cond = asyncio.Condition() | |
async def reader() -> AsyncGenerator[int, None]: | |
while True: | |
async with cond: | |
while len(buffer) == 0: | |
await cond.wait() | |
yield buffer.popleft() | |
async def writer(val: int) -> None: | |
async with cond: | |
buffer.append(val) | |
cond.notify_all() | |
return reader, writer | |
def spread_write(*writers: Writer) -> Writer: | |
""" | |
Takes in multiple writers and returns a writer that will write to all of them | |
""" | |
async def _helper(val: int) -> None: | |
await asyncio.wait([w(val) for w in writers]) | |
return _helper |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import defaultdict | |
from typing import Dict | |
Program = Dict[int, int] | |
def load(data: str) -> Program: | |
d = [int(i) for i in data.strip().split(',') if i] | |
return defaultdict(int, enumerate(d)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment