Skip to content

Instantly share code, notes, and snippets.

Forked from krishnanraman/
Created June 5, 2014 03:38
Show Gist options
  • Save asw456/9ce2a21bebdf6ec092ef to your computer and use it in GitHub Desktop.
Save asw456/9ce2a21bebdf6ec092ef to your computer and use it in GitHub Desktop.
# MAB Algos by Sergey Feldman
import numpy as np
from matplotlib import pylab as plt
#from mpltools import style # uncomment for prettier plots
# generate all bernoulli rewards ahead of time
def generate_bernoulli_bandit_data(num_samples,K):
CTRs_that_generated_data = np.tile(np.random.rand(K),(num_samples,1))
true_rewards = np.random.rand(num_samples,K) < CTRs_that_generated_data
return true_rewards,CTRs_that_generated_data
# totally random
def random(observed_data):
return np.random.randint(0,len(observed_data))
# the naive algorithm
def naive(observed_data,number_to_explore=100):
totals = observed_data.sum(1) # totals
if np.any(totals < number_to_explore): # if have been explored less than specified
least_explored = np.argmin(totals) # return the one least explored
return least_explored
else: # return the best mean forever
successes = observed_data[:,0] # successes
estimated_means = successes/totals # the current means
best_mean = np.argmax(estimated_means) # the best mean
return best_mean
# the epsilon greedy algorithm
def epsilon_greedy(observed_data,epsilon=0.01):
totals = observed_data.sum(1) # totals
successes = observed_data[:,0] # successes
estimated_means = successes/totals # the current means
best_mean = np.argmax(estimated_means) # the best mean
be_exporatory = np.random.rand() < epsilon # should we explore?
if be_exporatory: # totally random, excluding the best_mean
other_choice = np.random.randint(0,len(observed_data))
while other_choice == best_mean:
other_choice = np.random.randint(0,len(observed_data))
return other_choice
else: # take the best mean
return best_mean
# the UCB algorithm using
# (1 - 1/t) confidence interval using Chernoff-Hoeffding bound)
# for details of this particular confidence bound, see the UCB1-TUNED section, slide 18, of:
def UCB(observed_data):
t = float(observed_data.sum()) # total number of rounds so far
totals = observed_data.sum(1)
successes = observed_data[:,0]
estimated_means = successes/totals # sample mean
estimated_variances = estimated_means - estimated_means**2
UCB = estimated_means + np.sqrt( np.minimum( estimated_variances + np.sqrt(2*np.log(t)/totals), 0.25 ) * np.log(t)/totals )
return np.argmax(UCB)
# the UCB algorithm - using fixed 95% confidence intervals
# see slide 8 for details:
def UCB_normal(observed_data):
totals = observed_data.sum(1) # totals
successes = observed_data[:,0] # successes
estimated_means = successes/totals # sample mean
estimated_variances = estimated_means - estimated_means**2
UCB = estimated_means + 1.96*np.sqrt(estimated_variances/totals)
return np.argmax(UCB)
# Thompson Sampling
def thompson_sampling(observed_data):
return np.argmax( np.random.beta(observed_data[:,0], observed_data[:,1]) )
# the bandit algorithm
def run_bandit_alg(true_rewards,CTRs_that_generated_data,choice_func):
num_samples,K = true_rewards.shape
# seed the estimated params
prior_a = 1. # aka successes
prior_b = 1. # aka failures
observed_data = np.zeros((K,2))
observed_data[:,0] += prior_a # allocating the initial conditions
observed_data[:,1] += prior_b
regret = np.zeros(num_samples)
for i in range(0,num_samples):
# pulling a lever & updating observed_data
this_choice = choice_func(observed_data)
# update parameters
if true_rewards[i,this_choice] == 1:
update_ind = 0
update_ind = 1
observed_data[this_choice,update_ind] += 1
# updated expected regret
regret[i] = np.max(CTRs_that_generated_data[i,:]) - CTRs_that_generated_data[i,this_choice]
cum_regret = np.cumsum(regret)
return cum_regret
# define number of samples and number of choices
num_samples = 10000
K = 5
number_experiments = 100
regret_accumulator = np.zeros((num_samples,5))
for i in range(number_experiments):
print "Running experiment:", i+1
true_rewards,CTRs_that_generated_data = generate_bernoulli_bandit_data(num_samples,K)
regret_accumulator[:,0] += run_bandit_alg(true_rewards,CTRs_that_generated_data,random)
regret_accumulator[:,1] += run_bandit_alg(true_rewards,CTRs_that_generated_data,naive)
regret_accumulator[:,2] += run_bandit_alg(true_rewards,CTRs_that_generated_data,epsilon_greedy)
regret_accumulator[:,3] += run_bandit_alg(true_rewards,CTRs_that_generated_data,UCB)
regret_accumulator[:,4] += run_bandit_alg(true_rewards,CTRs_that_generated_data,UCB_normal)
plt.title('Simulated Bandit Performance for K = 5')
plt.ylabel('Cumulative Expected Regret')
plt.xlabel('Round Index')
plt.legend(('Random','Naive','Epsilon-Greedy','(1 - 1/t) UCB','95% UCB'),loc='lower right')
// Scala port by Krishnan Raman
import math.random
So there are num_products products in your shop (think amazon, not safeway)
Let num_products ~ [1,10]
You want to show one of num_products product recommendations.
The user either clicks on it or do not, and we log this binary reward.
Next, we proceed to the next user.
Repeat all over again
object MultiArmedBernoulliBandit extends App {
Generate all Bernoulli rewards ahead of time
def generate_bernoulli_bandit_data(num_samples:Int, num_products:Int) = {
// generate as many random numbers as number of products
val x = Seq.fill(num_products)(random)
// make one copy for each user
val CTRs_that_generated_data = Seq.tabulate(num_samples, num_products)((i,j) => x(j))
// for each user, generate as many random numbers as number of products
val users = Seq.fill(num_samples, num_products)(random)
// so the user gets a reward if his random guess is less than the CTR
val true_rewards = Seq.tabulate(num_samples, num_products){
(i,j) => users(i)(j) < CTRs_that_generated_data(i)(j)
(true_rewards, CTRs_that_generated_data)
def shape[T](a:Seq[Seq[T]]):(Int, Int) = (a(0).size, a.size)
def run_bandit_alg(
true_rewards: Seq[Seq[Boolean]],
CTRs_that_generated_data: Seq[Seq[Double]],
choice_func: Array[Array[Int]] => Int ) = {
// extract the # of rows & columns given a double dim array
val (num_products, num_samples) = shape(true_rewards)
//printf("num samples %d num_products %d\n", num_samples, num_products)
// make a num_product X 2 array ie. a 2 column array, with 1 row per product
// on the lhs are auccesses
// on the rhs are failures
// for every user, the MAB algo picks a product
// if the product it picks is the one the user wants, we increment the product's success column
// else we increment failure
val observed_data = Array.fill(num_products, 2)(1)
// every user has a regret number
// that number is the max ctr of user minus the ctr for his choice
// if the choice is the one that had the max ctr, the regret is zero
val regret = Array.fill(num_samples)(0.0)
for(i <- 0 until num_samples) {
// pulling a lever & updating observed_data
val this_choice = choice_func(observed_data)
// update parameters
val update_ind = if (true_rewards(i)(this_choice)) 0 else 1
observed_data(this_choice)(update_ind) += 1
// updated expected regret
regret(i) = CTRs_that_generated_data(i).max - CTRs_that_generated_data(i)(this_choice)
regret.scanLeft(0.0)((a,b)=>a+b).tail // cumulative regret
def totallyRandom(observed_data: Array[Array[Int]]):Int = util.Random.nextInt(observed_data.size)
// define number of samples and number of choices
def num_samples = 10
def num_products = 5
def number_experiments = 1
for (i<- 0 until number_experiments) {
printf("Running experiment: %d\n", i+1)
val precomputed = generate_bernoulli_bandit_data(num_samples, num_products)
val true_rewards = precomputed._1
val CTRs_that_generated_data = precomputed._2
val cum_regret = run_bandit_alg(true_rewards,CTRs_that_generated_data,totallyRandom _)
println("Cumulative Regret:" + cum_regret.mkString(","))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment