Skip to content

Instantly share code, notes, and snippets.

@asw456
Forked from krishnanraman/MAB.py
Created June 5, 2014 03:38
Show Gist options
  • Save asw456/9ce2a21bebdf6ec092ef to your computer and use it in GitHub Desktop.
Save asw456/9ce2a21bebdf6ec092ef to your computer and use it in GitHub Desktop.
# MAB Algos by Sergey Feldman
import numpy as np
from matplotlib import pylab as plt
#from mpltools import style # uncomment for prettier plots
#style.use(['ggplot'])
# generate all bernoulli rewards ahead of time
def generate_bernoulli_bandit_data(num_samples,K):
CTRs_that_generated_data = np.tile(np.random.rand(K),(num_samples,1))
true_rewards = np.random.rand(num_samples,K) < CTRs_that_generated_data
return true_rewards,CTRs_that_generated_data
# totally random
def random(observed_data):
return np.random.randint(0,len(observed_data))
# the naive algorithm
def naive(observed_data,number_to_explore=100):
totals = observed_data.sum(1) # totals
if np.any(totals < number_to_explore): # if have been explored less than specified
least_explored = np.argmin(totals) # return the one least explored
return least_explored
else: # return the best mean forever
successes = observed_data[:,0] # successes
estimated_means = successes/totals # the current means
best_mean = np.argmax(estimated_means) # the best mean
return best_mean
# the epsilon greedy algorithm
def epsilon_greedy(observed_data,epsilon=0.01):
totals = observed_data.sum(1) # totals
successes = observed_data[:,0] # successes
estimated_means = successes/totals # the current means
best_mean = np.argmax(estimated_means) # the best mean
be_exporatory = np.random.rand() < epsilon # should we explore?
if be_exporatory: # totally random, excluding the best_mean
other_choice = np.random.randint(0,len(observed_data))
while other_choice == best_mean:
other_choice = np.random.randint(0,len(observed_data))
return other_choice
else: # take the best mean
return best_mean
# the UCB algorithm using
# (1 - 1/t) confidence interval using Chernoff-Hoeffding bound)
# for details of this particular confidence bound, see the UCB1-TUNED section, slide 18, of:
# http://lane.compbio.cmu.edu/courses/slides_ucb.pdf
def UCB(observed_data):
t = float(observed_data.sum()) # total number of rounds so far
totals = observed_data.sum(1)
successes = observed_data[:,0]
estimated_means = successes/totals # sample mean
estimated_variances = estimated_means - estimated_means**2
UCB = estimated_means + np.sqrt( np.minimum( estimated_variances + np.sqrt(2*np.log(t)/totals), 0.25 ) * np.log(t)/totals )
return np.argmax(UCB)
# the UCB algorithm - using fixed 95% confidence intervals
# see slide 8 for details:
# http://dept.stat.lsa.umich.edu/~kshedden/Courses/Stat485/Notes/binomial_confidence_intervals.pdf
def UCB_normal(observed_data):
totals = observed_data.sum(1) # totals
successes = observed_data[:,0] # successes
estimated_means = successes/totals # sample mean
estimated_variances = estimated_means - estimated_means**2
UCB = estimated_means + 1.96*np.sqrt(estimated_variances/totals)
return np.argmax(UCB)
# Thompson Sampling
# http://www.economics.uci.edu/~ivan/asmb.874.pdf
# http://camdp.com/blogs/multi-armed-bandits
def thompson_sampling(observed_data):
return np.argmax( np.random.beta(observed_data[:,0], observed_data[:,1]) )
# the bandit algorithm
def run_bandit_alg(true_rewards,CTRs_that_generated_data,choice_func):
num_samples,K = true_rewards.shape
# seed the estimated params
prior_a = 1. # aka successes
prior_b = 1. # aka failures
observed_data = np.zeros((K,2))
observed_data[:,0] += prior_a # allocating the initial conditions
observed_data[:,1] += prior_b
regret = np.zeros(num_samples)
for i in range(0,num_samples):
# pulling a lever & updating observed_data
this_choice = choice_func(observed_data)
# update parameters
if true_rewards[i,this_choice] == 1:
update_ind = 0
else:
update_ind = 1
observed_data[this_choice,update_ind] += 1
# updated expected regret
regret[i] = np.max(CTRs_that_generated_data[i,:]) - CTRs_that_generated_data[i,this_choice]
cum_regret = np.cumsum(regret)
return cum_regret
# define number of samples and number of choices
num_samples = 10000
K = 5
number_experiments = 100
regret_accumulator = np.zeros((num_samples,5))
for i in range(number_experiments):
print "Running experiment:", i+1
true_rewards,CTRs_that_generated_data = generate_bernoulli_bandit_data(num_samples,K)
regret_accumulator[:,0] += run_bandit_alg(true_rewards,CTRs_that_generated_data,random)
regret_accumulator[:,1] += run_bandit_alg(true_rewards,CTRs_that_generated_data,naive)
regret_accumulator[:,2] += run_bandit_alg(true_rewards,CTRs_that_generated_data,epsilon_greedy)
regret_accumulator[:,3] += run_bandit_alg(true_rewards,CTRs_that_generated_data,UCB)
regret_accumulator[:,4] += run_bandit_alg(true_rewards,CTRs_that_generated_data,UCB_normal)
plt.semilogy(regret_accumulator/number_experiments)
plt.title('Simulated Bandit Performance for K = 5')
plt.ylabel('Cumulative Expected Regret')
plt.xlabel('Round Index')
plt.legend(('Random','Naive','Epsilon-Greedy','(1 - 1/t) UCB','95% UCB'),loc='lower right')
plt.show()
// Scala port by Krishnan Raman
import math.random
/*
So there are num_products products in your shop (think amazon, not safeway)
Let num_products ~ [1,10]
You want to show one of num_products product recommendations.
The user either clicks on it or do not, and we log this binary reward.
Next, we proceed to the next user.
Repeat all over again
*/
object MultiArmedBernoulliBandit extends App {
/*
Generate all Bernoulli rewards ahead of time
*/
def generate_bernoulli_bandit_data(num_samples:Int, num_products:Int) = {
// generate as many random numbers as number of products
val x = Seq.fill(num_products)(random)
// make one copy for each user
val CTRs_that_generated_data = Seq.tabulate(num_samples, num_products)((i,j) => x(j))
// for each user, generate as many random numbers as number of products
val users = Seq.fill(num_samples, num_products)(random)
// so the user gets a reward if his random guess is less than the CTR
val true_rewards = Seq.tabulate(num_samples, num_products){
(i,j) => users(i)(j) < CTRs_that_generated_data(i)(j)
}
(true_rewards, CTRs_that_generated_data)
}
def shape[T](a:Seq[Seq[T]]):(Int, Int) = (a(0).size, a.size)
def run_bandit_alg(
true_rewards: Seq[Seq[Boolean]],
CTRs_that_generated_data: Seq[Seq[Double]],
choice_func: Array[Array[Int]] => Int ) = {
// extract the # of rows & columns given a double dim array
val (num_products, num_samples) = shape(true_rewards)
//printf("num samples %d num_products %d\n", num_samples, num_products)
// make a num_product X 2 array ie. a 2 column array, with 1 row per product
// on the lhs are auccesses
// on the rhs are failures
// for every user, the MAB algo picks a product
// if the product it picks is the one the user wants, we increment the product's success column
// else we increment failure
val observed_data = Array.fill(num_products, 2)(1)
// every user has a regret number
// that number is the max ctr of user minus the ctr for his choice
// if the choice is the one that had the max ctr, the regret is zero
val regret = Array.fill(num_samples)(0.0)
for(i <- 0 until num_samples) {
// pulling a lever & updating observed_data
val this_choice = choice_func(observed_data)
// update parameters
val update_ind = if (true_rewards(i)(this_choice)) 0 else 1
observed_data(this_choice)(update_ind) += 1
// updated expected regret
regret(i) = CTRs_that_generated_data(i).max - CTRs_that_generated_data(i)(this_choice)
}
regret.scanLeft(0.0)((a,b)=>a+b).tail // cumulative regret
}
def totallyRandom(observed_data: Array[Array[Int]]):Int = util.Random.nextInt(observed_data.size)
// define number of samples and number of choices
def num_samples = 10
def num_products = 5
def number_experiments = 1
for (i<- 0 until number_experiments) {
printf("Running experiment: %d\n", i+1)
val precomputed = generate_bernoulli_bandit_data(num_samples, num_products)
val true_rewards = precomputed._1
val CTRs_that_generated_data = precomputed._2
val cum_regret = run_bandit_alg(true_rewards,CTRs_that_generated_data,totallyRandom _)
println("Cumulative Regret:" + cum_regret.mkString(","))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment