Skip to content

Instantly share code, notes, and snippets.

Last active December 11, 2019 08:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save e-nomem/6361bba17eeaa9d799f370ede7484b62 to your computer and use it in GitHub Desktop.
Save e-nomem/6361bba17eeaa9d799f370ede7484b62 to your computer and use it in GitHub Desktop.
Advent of Code 2019
from enum import IntEnum
from typing import Optional
from .io import Reader
from .io import stdin
from .io import stdout
from .io import Writer
from .utils import Program
class ParameterMode(IntEnum):
Position = 0
Immediate = 1
Relative = 2
class Opcode(IntEnum):
Add = 1
Multiply = 2
Input = 3
Output = 4
JmpIfTrue = 5
JmpIfFalse = 6
LessThan = 7
Equals = 8
SetRelBase = 9
Halt = 99
async def run(prog: Program, reader: Reader = stdin, writer: Writer = stdout) -> int:
Runs the provided program. The program will be mutated, so ensure that a copy is passed in if
it should be reused/rerun multiple times
def opcode() -> Opcode:
code = prog[ptr] % 100
return Opcode(code)
except ValueError:
raise RuntimeError(f"Unknown opcode '{code}' at index {ptr} ({prog[ptr]})") from None
def get_mode(param: int) -> ParameterMode:
mode = (prog[ptr] // (10 ** (param + 1))) % 10
return ParameterMode(mode)
except ValueError:
raise RuntimeError(f"Unknown parameter mode '{mode}' at index {ptr} ({prog[ptr]})") from None
def read(param: int) -> int:
if param < 1:
raise RuntimeError(f'Parameter index cannot be less than 1. {param} at index {ptr} ({prog[ptr]})')
mode = get_mode(param)
idx = -1
if mode == ParameterMode.Position:
idx = prog[ptr + param]
elif mode == ParameterMode.Immediate:
idx = ptr + param
elif mode == ParameterMode.Relative:
idx = rel_base + prog[ptr + param]
raise NotImplementedError(f"Parameter mode '{}' ({mode.value}) not yet implemented")
if idx < 0:
raise RuntimeError(f'Cannot read from negative pointer {idx} at index {ptr} ({prog[ptr]})')
return prog[idx]
def write(param: int, value: int) -> None:
if param < 1:
raise RuntimeError(f'Parameter index cannot be less than 1. {param} at index {ptr} ({prog[ptr]})')
mode = get_mode(param)
idx = -1
if mode == ParameterMode.Position:
idx = prog[ptr + param]
elif mode == ParameterMode.Relative:
idx = rel_base + prog[ptr + param]
raise NotImplementedError(f'Output parameter in mode {} ({mode.value}) is not supported')
if idx < 0:
raise RuntimeError(f'Cannot write to negative pointer {idx} at index {ptr} ({prog[ptr]})')
prog[idx] = value
ptr = 0
rel_base = 0
read_iter = reader()
# Main program loop starts here
while True:
if ptr < 0:
raise RuntimeError(f'Cannot run with negative pointer {ptr}')
instr = opcode()
if instr == Opcode.Add:
write(3, read(1) + read(2))
ptr += 4
elif instr == Opcode.Multiply:
write(3, read(1) * read(2))
ptr += 4
elif instr == Opcode.Input:
val: Optional[int] = None
async for val in read_iter:
# Read one value from input and break from the async generator
write(1, val)
if val is None:
raise RuntimeError('Input exhausted')
ptr += 2
elif instr == Opcode.Output:
# Write is async because it may need to acquire a lock
await writer(read(1))
ptr += 2
elif instr == Opcode.JmpIfTrue:
if read(1):
ptr = read(2)
ptr += 3
elif instr == Opcode.JmpIfFalse:
if not read(1):
ptr = read(2)
ptr += 3
elif instr == Opcode.LessThan:
val = 0
if read(1) < read(2):
val = 1
write(3, val)
ptr += 4
elif instr == Opcode.Equals:
val = 0
if read(1) == read(2):
val = 1
write(3, val)
ptr += 4
elif instr == Opcode.SetRelBase:
rel_base += read(1)
ptr += 2
elif instr == Opcode.Halt:
return ptr
raise NotImplementedError(f"Opcode '{}' ({instr.value}) not yet implemented")
import asyncio
from collections import deque
from typing import AsyncGenerator
from typing import Awaitable
from typing import Callable
from typing import Tuple
Reader = Callable[[], AsyncGenerator[int, None]]
Writer = Callable[[int], Awaitable[None]]
async def stdin() -> AsyncGenerator[int, None]:
Default Reader implementation that prompts for input on STDIN
while True:
yield int(input('(input)> '))
async def stdout(val: int) -> None:
Default Writer implementation that prints to STDOUT
print(f'(output)> {val}')
def repeat_input(val: int) -> Reader:
Creates a reader that will repeat the same input
async def _helper() -> AsyncGenerator[int, None]:
while True:
yield val
return _helper
def static_input(*init: int) -> Reader:
Creates a Reader that will return static data
data = list(init)
async def _helper() -> AsyncGenerator[int, None]:
while data:
yield data.pop()
return _helper
def input_concat(*readers: Reader) -> Reader:
Takes in multiple Readers and returns a Reader that will read from them sequentially
async def _helper() -> AsyncGenerator[int, None]:
for reader in readers:
read_iter = reader()
async for val in read_iter:
yield val
return _helper
def pipe(*init: int) -> Tuple[Reader, Writer]:
Provides a Reader/Writer pair that are connected to each other so that multiple programs
can be connected together
buffer = deque(list(init))
cond = asyncio.Condition()
async def reader() -> AsyncGenerator[int, None]:
while True:
async with cond:
while len(buffer) == 0:
await cond.wait()
yield buffer.popleft()
async def writer(val: int) -> None:
async with cond:
return reader, writer
def spread_write(*writers: Writer) -> Writer:
Takes in multiple writers and returns a writer that will write to all of them
async def _helper(val: int) -> None:
await asyncio.wait([w(val) for w in writers])
return _helper
from collections import defaultdict
from typing import Dict
Program = Dict[int, int]
def load(data: str) -> Program:
d = [int(i) for i in data.strip().split(',') if i]
return defaultdict(int, enumerate(d))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment