Skip to content

Instantly share code, notes, and snippets.

@krishnanraman
Last active December 15, 2015 13:39
Show Gist options
  • Save krishnanraman/5268832 to your computer and use it in GitHub Desktop.
Save krishnanraman/5268832 to your computer and use it in GitHub Desktop.
Histogram of Normal distribution
import com.twitter.scalding._
import util.Random
// Produce a histogram with 10 bins, of n numbers with Gaussian distribution N(5,1) ~ mean 5, stdev 1.
class HistogramTest(args : Args) extends Job(args) {
def l2t10(x:List[Int]) = Tuple10(x(0), x(1),x(2),x(3), x(4),x(5),x(6), x(7),x(8),x(9))
val tuples = (1 to args("n").toInt).map( x=> Random.nextGaussian + 5)
val bins = 10
val bin_names = (1 to bins).map( i=> Symbol("b"+i))
val pipe1 = IterableSource(tuples, 'n)
.read
val pipe2 = pipe1
.groupAll{
_.min('n-> 'mymin)
.max('n-> 'mymax)
}
pipe1.crossWithSmaller( pipe2 )
.mapTo(('n,'mymin,'mymax) -> bin_names){
x:(Double,Double,Double) =>
val (n,min,max) = x
val binwidth = (max-min)/(bins+0.0d)
// idx = the bin that n falls into
// suppose min = 1, max = 11, bins = 10 => binwidth = 1
// [1-2) fall into bin 0
// [2-3) fall into bin 1
// ...[10-11] fall into bin 9
val idx = math.min( math.floor((n-min)/binwidth).toInt, bins-1)
// printf( "%f in bin %d, min %f max %f\n", n, idx, min, max )
l2t10( List.tabulate[Int](bins)(x=> if (x == idx) 1 else 0))
}.groupAll{
_.sum('b1).sum('b2).sum('b3).sum('b4).sum('b5).sum('b6).sum('b7).sum('b8).sum('b9).sum('b10)
}
.write(Tsv("data/histo_test"))
}
[tw-mbp13-kraman scala (kraman/pctr-histogram)]$ scald --local HistogramTest.scala --n 1000000
[INFO] Found Job Class: HistogramTest
13/03/28 21:43:35 INFO property.AppProps: using app.id: 58F7CD892A31367BA4D0B5D5A9732752
13/03/28 21:44:34 INFO util.Version: Concurrent, Inc - Cascading 2.1.6-wip-114
13/03/28 21:44:34 INFO flow.Flow: [HistogramTest] starting
13/03/28 21:44:34 INFO flow.Flow: [HistogramTest] source: MemoryTap["TextDelimited[['n']]"]["0.4327983144253499"]
13/03/28 21:44:34 INFO flow.Flow: [HistogramTest] sink: FileTap["TextDelimited[[UNKNOWN]->[ALL]]"]["data/histo_test"]
13/03/28 21:44:34 INFO flow.Flow: [HistogramTest] parallel execution is enabled: true
13/03/28 21:44:34 INFO flow.Flow: [HistogramTest] starting jobs: 1
13/03/28 21:44:34 INFO flow.Flow: [HistogramTest] allocating threads: 1
13/03/28 21:44:34 INFO flow.FlowStep: [HistogramTest] starting step: local
13/03/28 21:44:34 INFO assembly.AggregateBy: using threshold value: 100000
13/03/28 21:44:34 INFO assembly.AggregateBy: using threshold value: 100000
[tw-mbp13-kraman scala (kraman/pctr-histogram)]$ cat data/histo_test
57 1855 22917 127976 314627 336822 159652 32969 3019 106
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment