Skip to content

Instantly share code, notes, and snippets.

@DavidRdgz
Last active August 23, 2018 18:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DavidRdgz/fd39220070b52d83e70fa2041f63419d to your computer and use it in GitHub Desktop.
Save DavidRdgz/fd39220070b52d83e70fa2041f63419d to your computer and use it in GitHub Desktop.
[Rainier] Massive Bayesian Inference in Spark using Rainer
import com.stripe.rainier.core.{Normal, Poisson}
import com.stripe.rainier.sampler.{RNG, ScalaRNG}
import org.apache.spark.{SparkConf, SparkContext}
object Driver {
implicit val rng: RNG = ScalaRNG(1527608515939L)
val DROP_BURN_IN = 100
/*
Refer to StackOverflow Q, about serializing methods/objects:
https://stackoverflow.com/questions/22592811/task-not-serializable-java-io-notserializableexception-when-calling-function-ou#22596875
*/
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
def average(l: List[Double]): Double =
l.size.toDouble / l.sum
def dropBurnIn(dropBurn: Int)(v: List[Double]): List[Double] =
v.drop(dropBurn)
def fitPoisson(y: List[Int]): List[Double] = {
val rate = for {
r <- Normal(5, 10).param
poisson <- Poisson(r).fit(y)
} yield r
rate.sample()
}
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Fit Poisson to Data")
.setMaster("local[8]")
val sc = new SparkContext(conf)
val data = sc.parallelize(
Array(
List(0, 1, 5, 0, 3, 5, 0, 3, 6, 0, 0, 3, 0, 6, 4, 0),
List(0, 1, 0, 2, 0, 3, 2, 0, 1, 2, 0, 1, 0, 0, 0, 1),
List(6, 8, 5, 0, 6, 6, 7, 0, 5, 6, 8, 6, 7, 0, 5, 5),
List(6, 4, 0, 4, 7, 5, 0, 5, 7, 0, 5, 8, 0, 5, 7, 0),
List(1, 2, 1, 3, 2, 1, 3, 1, 0, 2, 0, 1, 3, 2, 0, 1)
)
)
data
.map(genMapper(fitPoisson))
.map(dropBurnIn(DROP_BURN_IN))
.map(average)
.collect()
.foreach(println)
sc.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment