Skip to content

Instantly share code, notes, and snippets.

@Jasmeet107
Last active December 16, 2015 02:16
Show Gist options
  • Save Jasmeet107/4b4de8229a9db36a6f6e to your computer and use it in GitHub Desktop.
Save Jasmeet107/4b4de8229a9db36a6f6e to your computer and use it in GitHub Desktop.
9.66 Project
import numpy as np
import scipy as sp
from scipy import misc
from scipy import stats
from scipy import special
import random
import matplotlib.pyplot as plt
#The card set consists of one of each value card. Ace is given the value 11, but will be changed to 1 if/when it benefits the player.
card_set = {'a':11, '2':2, '3':3, '4':4, '5':5, '6':6, '7':7, '8':8, '9':9, '10':10, 'j':10, 'q':10, 'k':10}
#Each card hand value is given a score.
points = {1:1, 2:2, 3:3, 4:4, 5:5, 6:6, 7:7, 8:8, 9:9, 10:10, 11:11, 12:12, 13:13, 14:14, 15:15, 16:25, 17:35, 18:45, 19:60, 20:75, 21:100, -1:-50}
#Param card_hand is some array of cards from the card set
#Returns the value (sum of all card values) of a hand
#Will use the most beneficial value of A (1 or 11)
def value(card_hand):
current_score = 0
for card in card_hand:
current_score+=card_set[card]
num_aces = len(filter(lambda card: card == 'a', card_hand))
while (current_score > 21) and (num_aces > 0):
current_score -= 10
num_aces -= 1
if current_score > 21:
current_score = -1
return current_score
#Param card_hand is some array of cards from the card set
#Returns score of current hand
def score(card_hand):
hand_value = value(card_hand)
return points[hand_value]
#Param card_hand is some array of cards from the card set
#Given a card hand, calculates the increase in expected value
#of taking another card. Can return a negative value.
def expected_value_increase(card_hand):
current_value = value(card_hand)
current_score = score(card_hand)
possible_scores = []
for card in card_set.keys():
new_hand = card_hand+[card]
new_score = score(new_hand)
possible_scores.append(new_score)
avg_score = float(sum(possible_scores))/len(possible_scores)
delta_expected_value = avg_score-current_score
return delta_expected_value
#Generates a random hand that with sum <=21,
#then adds another random card to that hand
def generate_hands(card_set):
maximum_hands = []
for i in range(20):
current_hand = []
while value(current_hand) >= 0:
current_hand.append(random.choice(card_set.keys()))
maximum_hands.append(current_hand)
return maximum_hands
#Simple Python game for a player to play a game of 21
#Records all decisions made by the human
#max_hand is the entire hand, which will be dealt to the player one card at a time
#dataf is the file to which the human decision data will be written
def play_game(max_hand, dataf):
current_hand = [max_hand[0]]
still_playing = True
while still_playing:
print "This is your hand: %s (sum: %d, worth %d points)" %(current_hand, value(current_hand), score(current_hand))
if value(current_hand) < 0:
print "You lost."
still_playing = False
else:
choice = raw_input("Take another card? (y/n): ")
dataf.write("%s,%d\n" % ("".join(current_hand), choice == "y"))
if choice == "n":
still_playing = False
elif choice == "y":
current_hand.append(max_hand[len(current_hand)])
else:
print "FUCK YOU LISTEN TO MY RULES BITCH"
print "Round over! Your score for this round is %d points" % score(current_hand)
print
return score(current_hand)
#Initiates the Python Blackjack Game with a set of hands
def play_game_session(hands, dataf):
random.shuffle(hands)
score = 0
for hand in hands:
score += play_game(hand, dataf)
print "Your score is now %d points" % score
again = raw_input("Would you like to play another round? (y/n): ")
print
if again != "n" and again != "y": "FUCK YOU DICKWAD"
elif again == "n": break
print "Thanks for playing!"
#Given the data file, aggregates all the samples from each experiment
def aggregate_totals(dataf):
frequencies = {}
for line in dataf:
pieces = line.strip().split(",")
hand = "".join(sorted(pieces[0]))
take = pieces[1] == "1"
if hand not in frequencies:
frequencies[hand] = (1, 0+take)
else:
frequencies[hand] = (frequencies[hand][0]+1, frequencies[hand][1]+take)
return frequencies
#formats data into a readable format
#frequencies is a dictionary returned by aggregate_totals
def format_data(frequencies):
delta_expected_value = []
num_trials = []
num_takes = []
for hand_string, results in frequencies.items():
hand_string = hand_string.replace("01", "t")
hand = list(hand_string)
hand = [character.replace("t", "10") for character in hand]
delta_expected_value.append(expected_value_increase(hand))
num_trials.append(results[0])
num_takes.append(results[1])
return (delta_expected_value, num_trials, num_takes)
#uncomment while playing game
#dataf = open("data.txt", "ab")
#uncomment while aggregating
#dataf = open("data.txt", "rb")
q,1
q9,0
5,1
52,1
52j,1
q,1
q10,0
5,1
52,1
52j,0
10,1
10k,0
6,1
65,1
65k,0
2,1
2k,1
7,1
75,1
754,0
q,1
q10,0
q,1
q9,0
6,1
65,1
65k,0
5,1
52,1
52j,0
9,1
9q,0
2,1
2k,1
9,1
9q,0
7,1
75,1
754,0
10,1
10k,0
3,1
35,1
35a,0
3,1
32,1
329,1
5,1
52,1
52j,0
q,1
q10,0
2,1
2k,1
6,1
65,1
65k,0
q,1
q9,0
6,1
65,1
65k,0
q,1
q9,0
10,1
10k,0
7,1
75,1
754,0
5,1
52,1
52j,0
3,1
32,1
329,1
9,1
9q,0
3,1
35,1
35a,0
2,1
2k,1
q,1
q10,0
9,1
9q,0
10,1
10k,0
2,1
2k,1
3,1
32,1
329,1
6,1
65,1
65k,0
q,1
q9,0
q,1
q10,0
7,1
75,1
754,1
3,1
35,1
35a,0
5,1
52,1
52j,0
2,1
2k,1
5,1
52,1
52j,0
6,1
65,1
65k,0
q,1
q9,0
10,1
10k,0
3,1
32,1
329,0
7,1
75,1
754,0
3,1
35,1
35a,0
q,1
q10,0
9,1
9q,0
q,1
q9,0
7,1
75,1
754,0
q,1
q10,0
5,1
52,1
52j,0
3,1
35,1
35a,0
2,1
2k,1
10,1
10k,0
9,1
9q,0
6,1
65,1
65k,0
3,1
32,1
329,1
3,1
35,1
35a,0
2,1
2k,1
10,1
10k,0
3,1
32,1
329,0
5,1
52,1
52j,0
q,1
q9,0
6,1
65,1
65k,0
q,1
q10,0
9,1
9q,0
7,1
75,1
754,0
9,1
9q,0
9q,0
3,1
32,1
329,1
10,1
10k,0
q,1
q9,0
5,1
52,1
52j,0
5,1
52,1
52j,0
10,1
10k,0
q,1
q9,0
6,1
65,1
65k,0
3,1
35,1
35a,0
2,1
2k,1
7,1
75,1
754,1
9,1
9q,0
q,1
q10,0
3,1
32,1
329,1
import numpy as np
import scipy as sp
from scipy import misc
from scipy import stats
from scipy import special
import random
import matplotlib.pyplot as plt
from card_game import *
#Richards' curve, also known as the Generalized Logistic Function
#b: growth rate
#v>0: affects near which asymptote maximum growth occurs
#q: is related to the value richards(0)
#t: input
def richards(b, q, log_v, t):
a = 0
c = 1
k = 1
return a + (k-a)/np.power(c+q*np.exp(-1*b*t), 1.0/np.exp(log_v))
#Linear function
#a: slope
#b: y-intercept
#t: input
def linear(a, b, _, t):
return a*t+b
#Step Function
#p_min: minimum value
#p_max: maximum value
#thresh: threshold at which to use maximum instead of minimum value
#t: input
def step(p_min, p_max, thresh, t):
if t > thresh:
return p_max
else:
return p_min
#a: slope
#b: starting value
#thresh: threshold
#t: input
def threshold(a, b, thresh, t):
if t < thresh:
return b
else:
return a*(t-thresh) + b
#Beta function
#alpha and beta are original parameters to the Beta function
#phi_1: log(alpha/beta)
#phi_2: log(alpha+beta)
#t: input
def beta(phi_1, phi_2, _, t):
k1 = np.exp(phi_1)
k2 = np.exp(phi_2)
a = k1*k2/(k1+1)
b = k2/(k1+1)
return sp.stats.beta.pdf(t, a, b)
#Inverse Tan Function
#a: affects amplitude
#b: affects frequency
#c: affects offset
#t: input
def inverse_tan(a, b, c, t):
return a*np.arctan(float(t)/b) + c
###Calculates priors for function parameters. Most are 0, which means the parameters have equal priors###
def richards_prior(b, q, log_v):
return 0
def linear_prior(a, b, _):
return 0
def step_prior(p_min, p_max, thresh):
return 0
def threshold_prior(a, b, thresh):
return 0
def beta_prior(phi_1, phi_2, _):
k1 = np.exp(phi_1)
k2 = np.exp(phi_2)
a = k1*k2/(k1+1)
b = k2/(k1+1)
return a*b*np.power((a+b), -5./2)
def inverse_tan_prior(a, b, c):
return 0
graph_functions = [richards, linear, step, threshold, beta, inverse_tan]
graph_functions_priors = [richards_prior, linear_prior, step_prior, threshold_prior, beta_prior, inverse_tan_prior]
#Maps the last theta value to a function
#
#
def assign_graph_value(theta):
n = np.exp(theta[3])
graph_val = n/np.power(10, np.floor(np.log10(n))+1)
graph_id = int(float(graph_val)*(len(graph_functions)))
if graph_id < 0:
graph_id = 0
if graph_id >= len(graph_functions):
graph_id = len(graph_functions)-1
return graph_id
#Returns the log prior of the parameters for the function, given theta
def find_parameters_log_prior(theta):
return graph_functions_priors[assign_graph_value(theta)](theta[0], theta[1], theta[2])
#Returns the log prior of the curve, given theta
#Jacobian of graph value transformation = e^theta[3]
def find_function_log_prior(theta):
return theta[3]
#Parameters: the number of trials and number of decisions made to take a card, as well as the increase
#in expected value and the theta value.
#theta = [theta1, theta2, theta3, theta4], where theta1, theta2, and theta3 are the curve parameters
#theta4 represents which graph function is being used
#Returns the log_likelihood of the sample data given our theta value
def find_log_likelihood(num_trials, num_takes, delta_expected_value, theta):
graph_id = assign_graph_value(theta)
graph_fn = graph_functions[graph_id]
p = graph_fn(theta[0], theta[1], theta[2], delta_expected_value)
if p <= 0:
p = 1e-9
if p >= 1:
p = 1-1e-9
log_likelihood = sp.special.gammaln(num_trials+1)-sp.special.gammaln(num_takes+1)-sp.special.gammaln(num_trials-num_takes+1)+num_takes*np.log(p)+(num_trials-num_takes)*np.log(1-p)
return log_likelihood
#Returns the log_likelihood of vectors of sample data given our theta value
#num_trials and num_takes are vectors such that num_takes/num_trials represents
#the frequency a card was chosen when presented with that hand
def find_log_likelihood_vector(num_trials, num_takes, delta_expected_value, theta):
log_likelihood = 0
for i in range(len(num_trials)):
log_likelihood+=find_log_likelihood(num_trials[i], num_takes[i], delta_expected_value[i], theta)
return log_likelihood
#Returns the posterior probability of theta
def find_log_posterior(num_trials, num_takes, delta_expected_value, theta):
return (find_log_likelihood_vector(num_trials, num_takes, delta_expected_value, theta))+find_parameters_log_prior(theta)+find_function_log_prior(theta)
#Metropolis Algorithm: takes an initial theta, log posterior function, variances to use in jump distribution, and number of samples
#Returns an array of sampled theta values
def metropolis(theta_initial, log_posterior_fn, jump_variances, num_samples):
theta_current = theta_initial
samples = np.zeros((4, num_samples))
for i in range(num_samples):
theta_star = theta_current
#print theta_current
log_posterior_theta_current = log_posterior_fn(theta_current)
theta_star = np.random.multivariate_normal(theta_star, np.sqrt(jump_variances))
#print log_posterior_theta_current
accept = np.exp(log_posterior_fn(theta_star)-log_posterior_theta_current)
#print accept
if np.random.random() < accept:
samples[:,i] = theta_star
theta_current = theta_star
else:
samples[:,i] = theta_current
return samples
#Groups samples by graph type
def group_samples(samples):
grouped_samples = {}
for row in samples.T:
graph_id = assign_graph_value(row)
if graph_id not in grouped_samples:
grouped_samples[graph_id] = []
grouped_samples[graph_id].append(row)
return grouped_samples
#Counts the number of times each graph type is chosen in the algorithm
def count_graph_types(grouped_samples):
graph_counts = {}
for key, sample_set in grouped_samples.items():
graph_counts[key] = len(sample_set)
return graph_counts
#Given the grouped samples, returns the medians of the sampled
#theta values for the parameters of each graph
def find_medians(grouped_samples):
medians = {}
for key, sample_set in grouped_samples.items():
medians[key] = np.median(np.array(sample_set), axis=0)
return medians
#Given the grouped samples, returns the means of the sampled
#theta values for the parameters of each graph
def find_means(grouped_samples):
means = {}
for key, sample_set in grouped_samples.items():
means[key] = np.mean(np.array(sample_set), axis=0)
return means
#Runs Metropolis once, chooses the best graph, then runs metropolis num_epochs-1 times where for each run, the
#initial theta values are replaced with the mean theta values from the previous run
def run_metropolis(num_samples, variances, theta_initial, posterior, num_epochs, variance_factor=0.1):
theta_current = theta_initial
current_variances = [[variances[0], 0, 0, 0], [0, variances[1], 0, 0], [0, 0, variances[2], 0], [0, 0, 0, variances[3]]]
for i in range(num_epochs):
samples = metropolis(theta_current, posterior, current_variances, num_samples)
grouped_samples = group_samples(samples)
counts =count_graph_types(grouped_samples)
best_graph = max(counts, key=counts.get)
means = find_means(grouped_samples)
theta_current = [means[best_graph][0], means[best_graph][1], means[best_graph][2], grouped_samples[best_graph][0][3]]
current_variances = [[variance_factor*current_variances[0][0], 0, 0, 0], [0, variance_factor*current_variances[1][1], 0, 0], [0, 0, variance_factor*current_variances[2][2], 0], [0, 0, 0, 0]]
return samples
import numpy as np
import scipy as sp
from scipy import misc
from scipy import stats
from scipy import special
import random
import matplotlib.pyplot as plt
from card_game import *
from metropolis import *
dataf = open("data.txt", "rb")
(dev, trials, takes) = format_data(aggregate_totals(dataf))
#Wrapper function for finding the posterior of theta
def posterior(theta):
if theta[2] <= 0:
return float("-inf")
return find_log_posterior(trials, takes, dev, theta)
#Plots Richards' graph given theta values
def plot_richards_graph(theta):
probs = [num_takes*1.0/num_trials for (num_takes, num_trials) in zip(takes, trials)]
tvals = np.arange(-150.0, 50, 0.1)
plt.figure()
plt.scatter(dev, probs)
plt.plot(tvals, [richards(theta[0], theta[1], theta[2], t) for t in tvals])
plt.show()
samples = run_metropolis(100, [.3, .3, .3, .3], [0, 0, 0, 0], posterior, 10)
plot_richards_graph(find_means(group_samples(samples))[0])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment