Created
December 8, 2019 01:03
-
-
Save killjoy1221/0085c0a94f95a179720077e4b07e692c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import importlib | |
import sys | |
def open_input(name): | |
return open(f"input/{name}.txt") | |
def main(): | |
day = sys.argv[1] | |
if len(sys.argv) == 3: | |
mtd = sys.argv[2] | |
else: | |
mtd = 'main' | |
def default_method(): | |
raise Exception("Cannot find function " + mtd) | |
day_mod = importlib.import_module(day) | |
getattr(day_mod, mtd, default_method)() | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import traceback | |
from _thread import interrupt_main | |
from threading import Thread, Lock, Condition | |
from typing import List | |
P_MODE_POSITION = 0 | |
P_MODE_IMMEDIATE = 1 | |
def catch_exceptions(f): | |
exit_lock = Lock() | |
exited = False | |
def decorator(*args, **kwargs): | |
try: | |
return f(*args, **kwargs) | |
except Exception: | |
nonlocal exited | |
with exit_lock: | |
if not exited: | |
exited = True | |
traceback.print_exc() | |
interrupt_main() | |
return decorator | |
class IntBuffer: | |
def __init__(self, *buffer: int): | |
self.buffer = list(buffer) | |
self.read_index = 0 | |
self.write_index = len(self.buffer) | |
self.lock = Lock() | |
self.write_condition = Condition(self.lock) | |
def read(self, timeout: int = None) -> int: | |
if self.read_index >= self.write_index: | |
# Wait for a write. This is blocking | |
with self.lock: | |
if not self.write_condition.wait(timeout): | |
raise TimeoutError() | |
try: | |
return self.buffer[self.read_index] | |
finally: | |
self.read_index += 1 | |
def write(self, i: int): | |
try: | |
self.buffer.append(i) | |
finally: | |
self.write_index += 1 | |
with self.lock: | |
self.write_condition.notify() | |
class Opcodes(dict): | |
def __call__(self, code): | |
def decorator(f): | |
self[code] = f | |
return f | |
return decorator | |
class Bytecode(Thread): | |
opcodes = Opcodes() | |
def __init__(self, memory: List[int], stdin: IntBuffer = None): | |
Thread.__init__(self) | |
self.memory = memory | |
self.stdin = stdin or IntBuffer() | |
self.stdout = IntBuffer() | |
self.index = 0 | |
self.stdin_index = 0 | |
def get(self, value, mode): | |
if mode == P_MODE_POSITION: | |
return self.memory[value] | |
if mode == P_MODE_IMMEDIATE: | |
return value | |
def set(self, position, value): | |
self.memory[position] = value | |
def jump(self, pos): | |
self.index = pos | |
def read(self, count=1): | |
reads = [] | |
for i in range(count): | |
reads.append(self.memory[self.index]) | |
self.index += 1 | |
if len(reads) == 1: | |
return reads[0] | |
return tuple(reads) | |
@opcodes(1) | |
def add(self, p1_mode=0, p2_mode=0): | |
a, b, c = self.read(3) | |
self.set(c, self.get(a, p1_mode) + self.get(b, p2_mode)) | |
@opcodes(2) | |
def mul(self, p1_mode=0, p2_mode=0): | |
a, b, c = self.read(3) | |
self.set(c, self.get(a, p1_mode) * self.get(b, p2_mode)) | |
@opcodes(3) | |
def input(self): | |
a = self.read() | |
self.set(a, self.stdin.read()) | |
@opcodes(4) | |
def output(self, p1_mode=0): | |
a = self.read() | |
self.stdout.write(self.get(a, p1_mode)) | |
@opcodes(5) | |
def jump_true(self, p1_mode=0, p2_mode=0): | |
a, b = self.read(2) | |
if self.get(a, p1_mode): | |
self.jump(self.get(b, p2_mode)) | |
@opcodes(6) | |
def jump_false(self, p1_mode=0, p2_mode=0): | |
a, b = self.read(2) | |
if not self.get(a, p1_mode): | |
self.jump(self.get(b, p2_mode)) | |
@opcodes(7) | |
def less_than(self, p1_mode=0, p2_mode=0): | |
a, b, c = self.read(3) | |
self.set(c, int(self.get(a, p1_mode) < self.get(b, p2_mode))) | |
@opcodes(8) | |
def equals(self, p1_mode=0, p2_mode=0): | |
a, b, c = self.read(3) | |
self.set(c, int(self.get(a, p1_mode) == self.get(b, p2_mode))) | |
@opcodes(99) | |
def end(self): | |
exit() | |
@catch_exceptions | |
def run(self): | |
while True: | |
code = self.read() | |
modes = [int(i) for i in str(code // 100)] | |
# Remove any leading 0's. Needed because 3 // 100 == 0 | |
while modes and modes[0] == 0: | |
modes.pop(0) | |
modes.reverse() | |
code = code % 100 | |
if code not in self.opcodes: | |
raise Exception("Unknown opcode: " + str(code)) | |
try: | |
self.opcodes[code](self, *modes) | |
except SystemExit: | |
return | |
except TypeError: | |
print("code:", code, "modes:", modes) | |
raise |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import itertools | |
from typing import List, Iterable | |
import aoc | |
from bytecode import Bytecode, IntBuffer | |
def main(): | |
with aoc.open_input(__name__) as f: | |
instructions = [int(i) for i in f.read().split(',')] | |
print_amplification(instructions) | |
print_amplification(instructions, feedback=True) | |
def test(): | |
# max signal: 43210 | |
# phase setting: 4,3,2,1,0 | |
print_amplification([3, 15, 3, 16, 1002, 16, 10, 16, 1, 16, 15, 15, 4, 15, 99, 0, 0]) | |
# max signal: 54321 | |
# phase setting: 0,1,2,3,4 | |
print_amplification([3, 23, 3, 24, 1002, 24, 10, 24, 1002, 23, -1, 23, | |
101, 5, 23, 23, 1, 24, 23, 23, 4, 23, 99, 0, 0]) | |
# max signal: 65210 | |
# phase setting: 1,0,4,3,2 | |
print_amplification([3, 31, 3, 32, 1002, 32, 10, 32, 1001, 31, -2, 31, 1007, 31, 0, 33, | |
1002, 33, 7, 33, 1, 33, 31, 31, 1, 32, 31, 31, 4, 31, 99, 0, 0, 0]) | |
# max signal: 139629729 | |
# phase setting: 9,8,7,6,5 | |
print_amplification([3, 26, 1001, 26, -4, 26, 3, 27, 1002, 27, 2, 27, 1, 27, 26, | |
27, 4, 27, 1001, 28, -1, 28, 1005, 28, 6, 99, 0, 0, 5], | |
feedback=True) | |
# max signal: 18216 | |
# phase setting: 9,7,8,5,6 | |
print_amplification([3, 52, 1001, 52, -5, 52, 3, 53, 1, 52, 56, 54, 1007, 54, 5, 55, 1005, 55, 26, 1001, 54, | |
-5, 54, 1105, 1, 12, 1, 53, 54, 53, 1008, 54, 0, 55, 1001, 55, 1, 55, 2, 53, 55, 53, 4, | |
53, 1001, 56, -1, 56, 1005, 56, 6, 99, 0, 0, 0, 0, 10], | |
feedback=True) | |
def print_amplification(instructions, **kwargs): | |
max_phase, max_amp = get_max_amplification(instructions, **kwargs) | |
print(f"Max thruster signal: {max_amp} (from phase setting sequence {','.join(map(str, max_phase))})") | |
def get_instructions(): | |
with aoc.open_input(__name__) as f: | |
return [int(i) for i in f.read().split(',')] | |
def get_max_amplification(instructions: List[int], *, feedback=False): | |
amplifiers = {} | |
for seq in itertools.permutations(range(5) if not feedback else range(5, 10)): | |
amplifiers[seq] = amplify(instructions, seq, feedback) | |
return max(amplifiers.items(), key=lambda a: a[1]) | |
def amplify(instructions: List[int], phases: Iterable[int], feedback): | |
def new_bytecode(phase: int) -> Bytecode: | |
bc = Bytecode(instructions.copy(), IntBuffer(phase)) | |
bc.daemon = True | |
bc.start() | |
return bc | |
a, b, c, d, e = map(new_bytecode, phases) | |
a.stdin.write(0) | |
b.stdin.write(a.stdout.read()) | |
c.stdin.write(b.stdout.read()) | |
d.stdin.write(c.stdout.read()) | |
e.stdin.write(d.stdout.read()) | |
while feedback and a.is_alive(): | |
a.stdin.write(e.stdout.read(1)) | |
b.stdin.write(a.stdout.read(1)) | |
c.stdin.write(b.stdout.read(1)) | |
d.stdin.write(c.stdout.read(1)) | |
e.stdin.write(d.stdout.read(1)) | |
eo = e.stdout.read() | |
return eo |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment