Skip to content

Instantly share code, notes, and snippets.

@killjoy1221
Created December 8, 2019 01:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save killjoy1221/0085c0a94f95a179720077e4b07e692c to your computer and use it in GitHub Desktop.
Save killjoy1221/0085c0a94f95a179720077e4b07e692c to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import importlib
import sys
def open_input(name):
return open(f"input/{name}.txt")
def main():
day = sys.argv[1]
if len(sys.argv) == 3:
mtd = sys.argv[2]
else:
mtd = 'main'
def default_method():
raise Exception("Cannot find function " + mtd)
day_mod = importlib.import_module(day)
getattr(day_mod, mtd, default_method)()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
pass
import traceback
from _thread import interrupt_main
from threading import Thread, Lock, Condition
from typing import List
P_MODE_POSITION = 0
P_MODE_IMMEDIATE = 1
def catch_exceptions(f):
exit_lock = Lock()
exited = False
def decorator(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception:
nonlocal exited
with exit_lock:
if not exited:
exited = True
traceback.print_exc()
interrupt_main()
return decorator
class IntBuffer:
def __init__(self, *buffer: int):
self.buffer = list(buffer)
self.read_index = 0
self.write_index = len(self.buffer)
self.lock = Lock()
self.write_condition = Condition(self.lock)
def read(self, timeout: int = None) -> int:
if self.read_index >= self.write_index:
# Wait for a write. This is blocking
with self.lock:
if not self.write_condition.wait(timeout):
raise TimeoutError()
try:
return self.buffer[self.read_index]
finally:
self.read_index += 1
def write(self, i: int):
try:
self.buffer.append(i)
finally:
self.write_index += 1
with self.lock:
self.write_condition.notify()
class Opcodes(dict):
def __call__(self, code):
def decorator(f):
self[code] = f
return f
return decorator
class Bytecode(Thread):
opcodes = Opcodes()
def __init__(self, memory: List[int], stdin: IntBuffer = None):
Thread.__init__(self)
self.memory = memory
self.stdin = stdin or IntBuffer()
self.stdout = IntBuffer()
self.index = 0
self.stdin_index = 0
def get(self, value, mode):
if mode == P_MODE_POSITION:
return self.memory[value]
if mode == P_MODE_IMMEDIATE:
return value
def set(self, position, value):
self.memory[position] = value
def jump(self, pos):
self.index = pos
def read(self, count=1):
reads = []
for i in range(count):
reads.append(self.memory[self.index])
self.index += 1
if len(reads) == 1:
return reads[0]
return tuple(reads)
@opcodes(1)
def add(self, p1_mode=0, p2_mode=0):
a, b, c = self.read(3)
self.set(c, self.get(a, p1_mode) + self.get(b, p2_mode))
@opcodes(2)
def mul(self, p1_mode=0, p2_mode=0):
a, b, c = self.read(3)
self.set(c, self.get(a, p1_mode) * self.get(b, p2_mode))
@opcodes(3)
def input(self):
a = self.read()
self.set(a, self.stdin.read())
@opcodes(4)
def output(self, p1_mode=0):
a = self.read()
self.stdout.write(self.get(a, p1_mode))
@opcodes(5)
def jump_true(self, p1_mode=0, p2_mode=0):
a, b = self.read(2)
if self.get(a, p1_mode):
self.jump(self.get(b, p2_mode))
@opcodes(6)
def jump_false(self, p1_mode=0, p2_mode=0):
a, b = self.read(2)
if not self.get(a, p1_mode):
self.jump(self.get(b, p2_mode))
@opcodes(7)
def less_than(self, p1_mode=0, p2_mode=0):
a, b, c = self.read(3)
self.set(c, int(self.get(a, p1_mode) < self.get(b, p2_mode)))
@opcodes(8)
def equals(self, p1_mode=0, p2_mode=0):
a, b, c = self.read(3)
self.set(c, int(self.get(a, p1_mode) == self.get(b, p2_mode)))
@opcodes(99)
def end(self):
exit()
@catch_exceptions
def run(self):
while True:
code = self.read()
modes = [int(i) for i in str(code // 100)]
# Remove any leading 0's. Needed because 3 // 100 == 0
while modes and modes[0] == 0:
modes.pop(0)
modes.reverse()
code = code % 100
if code not in self.opcodes:
raise Exception("Unknown opcode: " + str(code))
try:
self.opcodes[code](self, *modes)
except SystemExit:
return
except TypeError:
print("code:", code, "modes:", modes)
raise
import itertools
from typing import List, Iterable
import aoc
from bytecode import Bytecode, IntBuffer
def main():
with aoc.open_input(__name__) as f:
instructions = [int(i) for i in f.read().split(',')]
print_amplification(instructions)
print_amplification(instructions, feedback=True)
def test():
# max signal: 43210
# phase setting: 4,3,2,1,0
print_amplification([3, 15, 3, 16, 1002, 16, 10, 16, 1, 16, 15, 15, 4, 15, 99, 0, 0])
# max signal: 54321
# phase setting: 0,1,2,3,4
print_amplification([3, 23, 3, 24, 1002, 24, 10, 24, 1002, 23, -1, 23,
101, 5, 23, 23, 1, 24, 23, 23, 4, 23, 99, 0, 0])
# max signal: 65210
# phase setting: 1,0,4,3,2
print_amplification([3, 31, 3, 32, 1002, 32, 10, 32, 1001, 31, -2, 31, 1007, 31, 0, 33,
1002, 33, 7, 33, 1, 33, 31, 31, 1, 32, 31, 31, 4, 31, 99, 0, 0, 0])
# max signal: 139629729
# phase setting: 9,8,7,6,5
print_amplification([3, 26, 1001, 26, -4, 26, 3, 27, 1002, 27, 2, 27, 1, 27, 26,
27, 4, 27, 1001, 28, -1, 28, 1005, 28, 6, 99, 0, 0, 5],
feedback=True)
# max signal: 18216
# phase setting: 9,7,8,5,6
print_amplification([3, 52, 1001, 52, -5, 52, 3, 53, 1, 52, 56, 54, 1007, 54, 5, 55, 1005, 55, 26, 1001, 54,
-5, 54, 1105, 1, 12, 1, 53, 54, 53, 1008, 54, 0, 55, 1001, 55, 1, 55, 2, 53, 55, 53, 4,
53, 1001, 56, -1, 56, 1005, 56, 6, 99, 0, 0, 0, 0, 10],
feedback=True)
def print_amplification(instructions, **kwargs):
max_phase, max_amp = get_max_amplification(instructions, **kwargs)
print(f"Max thruster signal: {max_amp} (from phase setting sequence {','.join(map(str, max_phase))})")
def get_instructions():
with aoc.open_input(__name__) as f:
return [int(i) for i in f.read().split(',')]
def get_max_amplification(instructions: List[int], *, feedback=False):
amplifiers = {}
for seq in itertools.permutations(range(5) if not feedback else range(5, 10)):
amplifiers[seq] = amplify(instructions, seq, feedback)
return max(amplifiers.items(), key=lambda a: a[1])
def amplify(instructions: List[int], phases: Iterable[int], feedback):
def new_bytecode(phase: int) -> Bytecode:
bc = Bytecode(instructions.copy(), IntBuffer(phase))
bc.daemon = True
bc.start()
return bc
a, b, c, d, e = map(new_bytecode, phases)
a.stdin.write(0)
b.stdin.write(a.stdout.read())
c.stdin.write(b.stdout.read())
d.stdin.write(c.stdout.read())
e.stdin.write(d.stdout.read())
while feedback and a.is_alive():
a.stdin.write(e.stdout.read(1))
b.stdin.write(a.stdout.read(1))
c.stdin.write(b.stdout.read(1))
d.stdin.write(c.stdout.read(1))
e.stdin.write(d.stdout.read(1))
eo = e.stdout.read()
return eo
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment