Last active
June 16, 2017 04:00
-
-
Save jthemphill/b00d579fcd3ee88f420f505812f2c5ae to your computer and use it in GitHub Desktop.
Attempt to solve JT's tea game for a single player
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import copy | |
import heapq | |
from typing import (Any, Dict, Iterator, List, NewType, Set, Tuple, TypeVar) | |
SMALL_T: int = 2 | |
BIG_T: int = 3 | |
JEFF_SOLUTION = 32 | |
GOAL = 50 | |
Cost = List[Tuple[str, int]] | |
Benefit = List[Tuple[str, int]] | |
Option = Tuple[Tuple[Cost, Benefit], str] | |
T = TypeVar('T') | |
trades: List[Option] = [ | |
# Supplier | |
(([('S', 1), ('$', 5)], [('S', 1), ('t', 5)]), 'pick tea'), | |
(([('B', 1), ('$', 10)], [('B', 1), ('P', 1)]), 'buy teapot'), | |
(([('B', 1), ('$', 15)], [('B', 1), ('S', 1)]), 'hire customer'), | |
# Brewing | |
(([('S', 1), ('P', 1), ('t', 2)], [('S', 1), ('P', 1), ('T', 2)]), 'small brew'), | |
(([('S', 1), ('P', 1), ('t', 5)], [('S', 1), ('P', 1), ('T', 4)]), 'big brew'), | |
(([('B', 1), ('P', 1), ('t', 5)], [('B', 1), ('P', 1), ('T', 6)]), 'boss brew'), | |
# Selling | |
(([('S', 1), ('C', 1), ('T', 1)], [('C', 1), ('S', 1), ('$', 2)]), 'sell customer'), | |
(([('S', 1), ('D', 1), ('T', 5)], [('D', 1), ('S', 1), ('$', 9)]), 'sell party'), | |
(([('B', 1), ('I', 1), ('T', 4)], [('I', 1), ('B', 1), ('$', 12)]), 'sell vip'), | |
] | |
def dlog(x: T) -> T: | |
print(x) | |
return x | |
def optimal_gain_rate() -> int: | |
'''The best rate at which we could possibly gain money''' | |
sell_to_c: int = 6 * 2 // SMALL_T | |
sell_to_d: int = 2 * 9 // SMALL_T | |
sell_to_i: int = 1 * 12 // BIG_T | |
return sell_to_c + sell_to_d + sell_to_i | |
class Timer: | |
def __init__(self): | |
self.jobs: List[Tuple[int, Benefit]] = [] | |
self._key: Any = None | |
def register(self, dt: int, benefit: Benefit) -> None: | |
heapq.heappush(self.jobs, (dt, benefit)) | |
self._key = None | |
def tick(self) -> Tuple[int, List[Benefit]]: | |
if not self.jobs: | |
return (0, []) | |
benefits = [] | |
dt = self.jobs[0][0] | |
while self.jobs and self.jobs[0][0] == dt: | |
benefits.append(heapq.heappop(self.jobs)[1]) | |
self.jobs = [(j[0] - dt, j[1]) for j in self.jobs] | |
self._key = None | |
return (dt, benefits) | |
def copy(self) -> 'Timer': | |
t = copy.deepcopy(self) | |
return t | |
def key(self) -> Any: | |
if self._key is None: | |
self._key = tuple((j[0], tuple(j[1])) for j in sorted(self.jobs)) | |
return self._key | |
def __str__(self) -> str: | |
return 'Timer ({})'.format(self.jobs) | |
def __hash__(self) -> int: | |
return hash(self.key()) | |
def __lt__(self, other: Any) -> bool: | |
return self.key() < other.key() | |
def __eq__(self, other: Any) -> bool: | |
return self.key() == other.key() | |
class Player: | |
def __init__(self): | |
self.resources: Dict[str, int] = { | |
'B': 1, | |
'S': 2, | |
'P': 1, | |
'$': 20, | |
't': 0, | |
'T': 0, | |
'C': 6, | |
'D': 2, | |
'I': 1, | |
} | |
self.timer: Timer = Timer() | |
self._key: Any = None | |
def copy(self) -> 'Player': | |
p = copy.deepcopy(self) | |
return p | |
def wait(self) -> Tuple[int, Dict[str, int]]: | |
dt, benefits = self.timer.tick() | |
accruals = {k: 0 for k in self.resources.keys()} | |
for b in benefits: | |
for k, v in b: | |
self.resources[k] += v | |
accruals[k] += v | |
self._key = None | |
return dt, accruals | |
def take_option(self, option: Option) -> None: | |
cost, benefit = option[0] | |
for k, v in cost: | |
self.resources[k] -= v | |
if 'S' in cost[0]: | |
dt = SMALL_T | |
elif 'B' in cost[0]: | |
dt = BIG_T | |
else: | |
raise Exception('Neither S nor B found in {}'.format(cost)) | |
self.timer.register(dt, benefit) | |
self._key = None | |
def can_wait(self) -> bool: | |
return bool(self.timer.jobs) | |
def key(self) -> Any: | |
if self._key is None: | |
self._key = ( | |
tuple((k, self.resources[k]) for k in sorted(self.resources.keys())), | |
self.timer.key(), | |
) | |
return self._key | |
def __repr__(self) -> str: | |
return 'Player({})'.format(self.key()) | |
def __str__(self) -> str: | |
return 'Player({})'.format(self.key()) | |
def __hash__(self) -> int: | |
return hash(self.key()) | |
def __lt__(self, other: Any) -> bool: | |
return self.key() < other.key() | |
def __eq__(self, other: Any) -> bool: | |
return self.key() == other.key() | |
def options(self) -> List[Option]: | |
return [t for t in trades | |
if all((self.resources[k] >= v for k, v in t[0][0]))] | |
def h(p: Player) -> int: | |
# money we still need to make | |
money_needed = GOAL - p.resources['$'] | |
# Heuristic that *might* be wrong, but gets the same answer as a | |
# known-optimistic heuristic | |
#return int(money_needed // 10) | |
# Heuristic that's guaranteed to be optimistic | |
return money_needed // optimal_gain_rate() | |
def a_star(start: Player) -> Tuple[int, List[Tuple[Player, str]]]: | |
closed_set: Set[Player] = set() | |
came_from: Dict[Player, Tuple[Player, str]] = {} | |
g_score: collections.defaultdict = collections.defaultdict(lambda: 1000) | |
g_score[start] = 0 | |
f_score: List[Tuple[int, Player]] = [] | |
f_score.append((h(start), start)) | |
while f_score: | |
(current_time, current_player) = heapq.heappop(f_score) | |
if current_player.resources['$'] >= GOAL: | |
return (current_time, reconstruct_path(came_from, current_player)) | |
closed_set.add(current_player) | |
# Try waiting for a job to finish | |
if current_player.can_wait(): | |
wait_player = current_player.copy() | |
dt, accruals = wait_player.wait() | |
if wait_player not in closed_set: | |
tentative_g_score: int = g_score[current_player] + dt | |
if tentative_g_score < g_score[wait_player]: | |
g_score[wait_player] = tentative_g_score | |
heapq.heappush( | |
f_score, | |
(tentative_g_score + h(wait_player), wait_player), | |
) | |
came_from[wait_player] = ( | |
current_player, | |
'wait {} (${})'.format( | |
dt, | |
accruals['$'], | |
), | |
) | |
# One edge per timer we can assign | |
for option in current_player.options(): | |
new_player = take_option(current_player, option) | |
if new_player in closed_set: | |
continue | |
tentative_g_score = g_score[current_player] | |
if tentative_g_score < g_score[new_player]: | |
g_score[new_player] = tentative_g_score | |
heapq.heappush(f_score, (tentative_g_score + h(new_player), new_player)) | |
came_from[new_player] = (current_player, option[1]) | |
raise Exception('Goal not found') | |
def reconstruct_path( | |
came_from: Dict[Player, Tuple[Player, str]], | |
current: Player | |
) -> List[Tuple[Player, str]]: | |
path = [] | |
while current in came_from: | |
current, option_str = came_from[current] | |
path.append((current, option_str)) | |
return path[::-1] | |
def take_option(player: Player, option: Option | |
) -> Player: | |
player = player.copy() | |
player.take_option(option) | |
return player | |
t, trace = a_star(Player()) | |
print('Time: {}'.format(t)) | |
print('Path: {}'.format([t[1] for t in trace])) |
Author
jthemphill
commented
Jun 16, 2017
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment