Skip to content

Instantly share code, notes, and snippets.

@yukunlin
Last active April 19, 2016 05:23
Show Gist options
  • Save yukunlin/1a321467f38c4c69d773fe8b36b70034 to your computer and use it in GitHub Desktop.
Save yukunlin/1a321467f38c4c69d773fe8b36b70034 to your computer and use it in GitHub Desktop.
Find the solution to countdown problem, original and generalized version
from fractions import Fraction
from multiprocessing import Process, Pipe
from itertools import izip
class Operations:
# Generalized version that allows for any intermediate values
generalized_preconditions = [
lambda target, num: True, # num + (target - num)
lambda target, num: num != 0, # num * (target / num)
lambda target, num: True, # (target + num) - num
lambda target, num: True, # num - (num - target)
lambda target, num: num != 0, # (num * target) / num
lambda target, num: num != 0 and target !=0 # num / (num / target)
]
# Strict version that limits intermediates to natural numbers
restricted_preconditions = [
lambda target, num: target > num, # num + (target - num)
lambda target, num: num != 0 and target % num == 0, # num * (target / num)
lambda target, num: True, # (target + num) - num
lambda target, num: num > target, # num - (num - target)
lambda target, num: num != 0, # (num * target) / num
lambda target, num: num != 0 and target !=0 and num % target == 0 # (num) / (num / target)
]
partial_targets = [
lambda target, num: target-num, # num + (target - num)
lambda target, num: Fraction(target, num), # num * (target / num)
lambda target, num: target + num, # (target + num) - num
lambda target, num: num - target, # num - (num - target)
lambda target, num: num * target, # (num * target) / num
lambda target, num: Fraction(num, target), # num / (num / target)
]
@staticmethod
def plus_minus_exists(x):
return '+' in x or '- ' in x
@staticmethod
def operator_exists(x):
return '+' in x or '- ' in x or '*' in x or '/' in x
solution_formats = [
lambda part_expr: '{0} + {1}', # num + (target - num)
lambda part_expr: '{0} * ({1})' if Operations.plus_minus_exists(part_expr) else '{0} * {1}', # num * (target / num)
lambda part_expr: '{1} - {0}', # (target + num) - num
lambda part_expr: '{0} - ({1})' if Operations.plus_minus_exists(part_expr) else '{0} - {1}', # num - (num - target)
lambda part_expr: '({1}) / {0}' if Operations.plus_minus_exists(part_expr) else '{1} / {0}', # (num * target) / num
lambda part_expr: '{0} / ({1})' if Operations.operator_exists(part_expr) else '{0} / {1}', # num / (num / target)
]
class CountdownSolver:
def __init__(self, restricted=False):
"""Constructor
Args:
restricted (bool): Set to true to limit intermediate values to natural numbers,
defaults to False
"""
self._restricted = restricted
if restricted:
self._preconditions = Operations.restricted_preconditions
else:
self._preconditions = Operations.generalized_preconditions
def _partial_solve(self, target, num_to_use, numbers):
"""Helper for the solve_sequential function
Attempt to find a solution where num_to_use is one argument of a binary operation, and
numbers make up an arithmetic expression that is other argument of the binary operation.
Args:
target (int): The number that the solution expression should evaluate to.
num_to_use (int): The number that makes up one argument of a binary operation.
numbers (list): The list of numbers that will be used in the
arithmetic expression that makes up other argument of the binary operation.
Returns:
solution_expression: The solution expression as a string if it exists,
None otherwise
"""
generate_solution = lambda x : x.format(num_to_use, partial_solution)
args = izip(self._preconditions, Operations.partial_targets, Operations.solution_formats)
for pre_condition, partial_target, solution_format in args:
if not pre_condition(target, num_to_use):
continue
partial_solution = self._solve_sequential(partial_target(target, num_to_use), numbers)
if partial_solution:
return generate_solution(solution_format(partial_solution))
return None
def _base_cases(self, target, numbers):
# base case, no numbers left to use
if len(numbers) == 0:
return True, None
# base case, only number left to use is not target
if len(numbers) == 1 and numbers[0] != target:
return True, None
# base case, only number left to use is the target
if len(numbers) == 1 and numbers[0] == target:
return True, str(target)
return False, None
def _solve_sequential(self, target, numbers):
"""Solve for an expression that evaluates to target
Find an arithmetic expression that evaluates to target using all numbers if it exists.
Args:
target (int): The target that the expression should evaluate to.
numbers (list): The numbers to use in the solution expression
Returns:
solution_expression: The solution expression as a string if it exists, None otherwise.
"""
can_return, rval = self._base_cases(target, numbers)
if can_return:
return rval
# generate arguments for recursive cases
indicies = range(len(numbers))
args = [(target, numbers[x], tuple(numbers[:x] + numbers[x+1:]))
for x in indicies]
# recurse through the arguments
for a in args:
result = self._partial_solve(*a)
# Stop at first solution found
if result:
return result
# no solution
return None
def _solve_parallel(self, target, numbers):
"""Like solve_sequential, but parallizes the first level of recursion
"""
can_return, rval = self._base_cases(target, numbers)
if can_return:
return rval
def solve_job((target, num_to_use, numbers), pipe):
rval = self._partial_solve(target, num_to_use, numbers)
pipe.send(rval)
# generate arguments for recursive cases
indicies = range(len(numbers))
args = [((target, numbers[x], tuple(numbers[:x] + numbers[x+1:])), Pipe())
for x in indicies]
processes_and_connections = []
for (a, (parent_conn, child_conn)) in args:
# create process and start it
p = Process(target=solve_job, args=(a, child_conn))
processes_and_connections.append((p, parent_conn))
p.start()
no_sol_counter = 0
while True:
sol_found = None
# poll for solutions
for (p, child_conn) in processes_and_connections:
# block for 0.01s in total
can_receive = child_conn.poll(0.01 / len(processes_and_connections))
if can_receive:
candidate_sol = child_conn.recv()
if candidate_sol:
sol_found = candidate_sol
break
else:
no_sol_counter += 1
# A solution is found, or all possible branches have no solution
if no_sol_counter == len(processes_and_connections) or sol_found:
# clean up
for (p, child_conn) in processes_and_connections:
p.terminate()
return sol_found
def solve(self, target, numbers, parallize=True):
"""Wrapper for solve_paralleize or solve_sequential
"""
if self._restricted:
if target < 0:
return None
if not all([x >= 0 for x in numbers]):
return None
solver = self._solve_parallel if parallize else self._solve_sequential
solution = solver(target, numbers)
return solution
if __name__ == '__main__':
# python countdown_solver.py 24 3 3 8 8
import sys
cli_args = sys.argv[1:]
if '-h' in cli_args:
print 'countdown_solver.py [-r] [-s] target num1 num2 ...'
print
print 'Required arguments'
print ' target number that expression must evaluate to'
print ' num1 num2 ... numbers that must be used in the expression'
print
print 'Optional arguments'
print ' -r restrict intermediate expressions to natural numbers'
print ' -s sequential search, disables parallelization'
exit(0)
numeric_args = map(int, filter(lambda x: x != '-r' and x != '-s', cli_args))
if len(cli_args) < 2:
sys.stderr.write('Must specify target and at least one other number, ')
sys.stderr.write('i.e. countdown_solver.py 24 3 3 8 8\n')
exit(1)
restricted = '-r' in cli_args
parallize = '-s' not in cli_args
solver = CountdownSolver(restricted=restricted)
target = numeric_args[0]
numbers = numeric_args[1:]
print solver.solve(target, numbers, parallize=parallize)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment