Skip to content

Instantly share code, notes, and snippets.

@alaiacano
Created January 25, 2017 18:39
Show Gist options
  • Save alaiacano/207e41c1d693069896bb3faef151a17a to your computer and use it in GitHub Desktop.
Save alaiacano/207e41c1d693069896bb3faef151a17a to your computer and use it in GitHub Desktop.
Example of using Algebird to combine multi armed bandit tests.
import com.twitter.algebird._
import com.twitter.algebird.Operators._
import scala.util.Random
Random.setSeed(0L)
/**
* An "arm" represents an option in our A/B test.
*
* @param name Name of this option
* @param totalValue The accumulated value that this option has earned. Could be # of clicks, total
* minutes streamed, dollars purchased, etc. It's assumed that more value is good.
*/
case class Arm(name: String, totalValue: Double = 0.0, totalPulls: Int = 0)
/**
* An epsilon greedy algorithm.
* @param arms The "A" and "B" in an A/B test. Each option has a name and accumulated value
* (number of times it's been clicked, for example)
* @param alpha Percent of the time we want to use the best performing Arm. No idea why I didn't call this "epsilon" but
there's no turning back now!
*/
case class EpsilonGreedy(arms: Seq[Arm], alpha: Double=0.9) {
def pull: String = {
if(Random.nextDouble() < alpha) {
arms.maxBy(_.totalValue).name
} else {
arms.toIndexedSeq(Random.nextInt(arms.size)).name
}
}
def topArm: String = arms.maxBy(_.totalValue).name
/**
* This is how you report back the result of an experiment. I reuse `Arm` which is maybe not
* the best idea, but it stores all of the info we need for now so whatevs.
*
* @param result Result of a specific test.
* @return An updated instance of the EpsilonGreedy class.
*/
def update(result: Arm): EpsilonGreedy = {
this.copy(arms=arms + Seq(result), alpha)
}
}
/**
* How do we combine Arms together? Add the value and # of pulls if it's seen on both sides of the
* plus, otherwise keep a list of all of the arms we've seen.
*
* For example:
* Seq( Arm("a", 1), Arm("b", 2) ) + Seq(Arm("a", 3)) = Seq(Arm("a", 4), Arm("b", 2))
*/
object ArmSemiGroup extends Semigroup[Seq[Arm]] {
def plus(l: Seq[Arm], r: Seq[Arm]): Seq[Arm] = {
(l.map { a => Map(a.name -> (a.totalValue, a.totalPulls))} + r.map { a => Map(a.name -> (a.totalValue, a.totalPulls))})
.reduce(_ + _)
.map { case (n, (v, p)) => Arm(n, v, p)}
.toSeq
}
}
// This is important! If thise semigroup is not in scope, EGSemigroup.plus will not work right!
implicit val armSemi = ArmSemiGroup
/**
* How do we combine EpsilonGreedy tests together? We just need to make sure that the value of
* all arms is added together properly. Then we'll have an updated state of value and # of pulls
* for each arm.
*/
object EGSemigroup extends Semigroup[EpsilonGreedy] {
def plus(l: EpsilonGreedy, r: EpsilonGreedy): EpsilonGreedy = {
// if armSemi is not in scope, the "+" here will be Seq.++
// but since we've explicitly defined "+" for Seq[Arm] by creating the implicit val armSemi,
// that will be used instead
// note: need to figure out which `alpha` to use. It's easy to justify
// plenty of different things (highest, lowest, average, RuntimeException),
// but we'll pick the max, which is the most conservative approach
EpsilonGreedy(l.arms + r.arms, math.max(l.alpha, r.alpha))
}
}
// Need one of these in scope in order to add EpsilonGreedy's together
implicit val egSemi = EGSemigroup
// We could have also skipped making EGSemigroup and defined egSemi directly like this:
//implicit val egSemi = Semigroup.from{ (l: EpsilonGreedy, r: EpsilonGreedy) => EpsilonGreedy(l.arms + r.arms) }
val egAggregator: Aggregator[Arm, EpsilonGreedy, EpsilonGreedy] = Aggregator
// Define which semigroup we want to use to combine EpsilonGreedy's together.
// if only one implicit semigroup is in scope, you don't need this. It's nice to provide it anyway for clarity.
.fromSemigroup(egSemi)
// transform an arm into an instance of EpsilonGreedy that has only seen that one arm
.composePrepare{(arm: Arm) =>
EpsilonGreedy(Seq(arm))
}
// optionally, we can use `andThenPresent`, and get the name of the top valued arm back
// this would turn the aggregator into an Aggregator[Arm, EpsilonGreedy, String]
// .andThenPresent{(eg: EpsilonGreedy) => eg.topArm }
// or ask it for the next arm to use:
// .andThenPresent { (eg: EpsilonGreedy) => eg.pull }
////////////////////////////////////////////////////////////////////////////////////
// Now we use it.
////////////////////////////////////////////////////////////////////////////////////
// Our first experiment is run two times, once with arm "a" and once with "b"
val test1 = Seq(
Arm("a", totalValue=3, totalPulls=1),
Arm("c", totalValue=2, totalPulls=1)
)
// Our other experiment runs 3 times, with arms "a", "b", and "c", with different results
val test2 = Seq(
Arm("a", totalValue=0.5, totalPulls=1),
Arm("b", totalValue=3, totalPulls=1),
Arm("c", totalValue=2, totalPulls=1)
)
// Now let's aggregate the results to get the overall performance of each test.
// This will convert each Arm into an EpsilonGreedy and then combine them all together.
val firstTest = egAggregator(test1) // best in first test was a
val secondTest = egAggregator(test2) // best in second test was b
/*
firstTest: EpsilonGreedy = EpsilonGreedy(List(Arm(c,2.0,1), Arm(a,3.0,1)),0.9)
secondTest: EpsilonGreedy = EpsilonGreedy(List(Arm(a,0.5,1), Arm(b,3.0,1), Arm(c,2.0,1)),0.9)
*/
// So now we have the results of two different experiments with different "best arm" results.
// If we were to freeze the state of each Bandit and request the Arm to use 100 times, what distribution of arms would it give us?
(1 to 100).map { i => Map(firstTest.pull -> 1)}.reduce(_ + _) // Map(c -> 6, a -> 94)
(1 to 100).map { i => Map(secondTest.pull -> 1)}.reduce(_ + _) // Map(a -> 3, b -> 96, c -> 1)
// it mostly chooses the one with the highest value. Perfect.
// If we combine the results together and then draw from the aggregated test, what distribution of arms would it give us?
// This uses the implicit egSemi to know how to combine them
val combinedTests = egAggregator(test1) + egAggregator(test2)
/*
combinedTests: EpsilonGreedy = EpsilonGreedy(List(Arm(a,3.5,2), Arm(c,4.0,2), Arm(b,3.0,1)),0.9)
// The best overall was c after combining!
*/
// let's see what the combined MAB would select in 100 pulls
(1 to 100).map { i => Map(combinedTests.pull -> 1)}.reduce(_ + _)
// Map(a -> 4, c -> 94, b -> 2)
// So, we can easily combine the results of two tests by just adding their aggregated values together.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment