Skip to content

Instantly share code, notes, and snippets.

@ottomata
Created March 26, 2015 18:29
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save ottomata/2025b974b1a65c747bab to your computer and use it in GitHub Desktop.
Save ottomata/2025b974b1a65c747bab to your computer and use it in GitHub Desktop.
I have no idea what I'm doing here!
// spark-shell --jars /home/otto/algebird-core_2.10-0.9.0.jar,/home/mforns/refinery-core-0.0.9.jar
import java.util.Date
import java.text.SimpleDateFormat
import org.wikimedia.analytics.refinery.core.PageviewDefinition
import org.wikimedia.analytics.refinery.core.Webrequest
import scala.math.pow
import org.apache.spark.rdd.RDD
import com.twitter.algebird.QTree
import com.twitter.algebird.QTreeSemigroup
// helper methods
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
def ms (s:String) : Long = { dateFormat.parse(s).getTime() }
def s (a:Any) : String = { a.asInstanceOf[String] }
// statistical methods
def geoMean (nums:RDD[Long]) : Double = {
pow(nums.fold(1)(_ * _), 1.0 / nums.count)
}
def printStats (nums:RDD[Long]) = {
// approximate/probabilistic quantiles?
val qtSemigroup = new QTreeSemigroup[Long](4)
val sum = nums.map { QTree(_) }.reduce(qtSemigroup.plus)
println(""""
count: %s
geoMean: %s
mean: %s
min: %s
max: %s
.25 quantile bounds %s
.50 quantile bounds %s
.75 quantile bounds %s
""".format(
nums.count,
geoMean(nums),
nums.mean,
nums.min,
nums.max,
sum.quantileBounds(.25),
sum.quantileBounds(.5),
sum.quantileBounds(.75)
))
}
/**
* Empty list of sessions
* To be used as zero value for the sessionize function.
*/
val emptySessions = List[List[Long]]()
/**
* Session logic
*
* @param sessions List of sessions. Each session is represented
* as an ordered list of pageview timestamps.
* @param timestamp The pageview timestamp to be merged to the
* session list. It is assumed to be greater than
* all the previous timestamps in the session list.
*
* @return The list of sessions including the new pageview timestamp.
* Depending on the time passed since last pageview,
* The timestamp will be allocated as part of the last session
* or in a new session.
*/
def sessionize (sessions:List[List[Long]], timestamp:Long) : List[List[Long]] = {
if (sessions.length == 0) List(List(timestamp))
else {
if (timestamp <= sessions.last.last + 1800000) {
sessions.init :+ (sessions.last :+ timestamp)
} else sessions :+ List(timestamp)
}
}
// setup sql engine
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.parquetFile(
"/wmf/data/wmf/webrequest/webrequest_source=mobile/year=2015/month=3/day=20/hour=0/000000_0"
).registerTempTable("webrequest")
// compute sessions by user
val userSessions = sqlContext.
// get webrequest data
sql("""SELECT uri_path, uri_query, content_type, user_agent, x_analytics, dt
FROM webrequest WHERE is_pageview = TRUE""").
// filter app pageviews
filter(r => PageviewDefinition.getInstance.isAppPageview(s(r(0)), s(r(1)), s(r(2)), s(r(3)))).
// map: pageview -> (uuid, timestamp)
map(pv => (Webrequest.getInstance.getXAnalyticsValue(s(pv(4)), "uuid"), ms(s(pv(5))))).
// aggregate: (uuid, timestamp)* -> (uuid, List(ts1, ts2, ts3, ...))
combineByKey(
List(_),
(l:List[Long], t:Long) => l :+ t,
(l1:List[Long], l2:List[Long]) => l1 ::: l2
).
// sample sessions to 10%
sample(false, 0.1).
// map: (uuid, List(ts1, ts2, ts3, ...)) -> (uuid, List(List(ts1, ts2), List(ts3), ...)
map(p => (p._1, p._2.sorted.foldLeft(emptySessions)(sessionize)))
// flatten: (uuid, List(session1, session2, ...) -> session*
val sessions = userSessions.flatMap(_._2)
// metrics
val sessionsPerUser = userSessions.map(r => { r._2.length.asInstanceOf[Long] })
val pageviewsPerSession = sessions.map(r => { r.length.asInstanceOf[Long] })
val sessionLength = sessions.filter(r => { r.length > 1 }).map(r => { r.last - r(0) })
// output
printStats(sessionsPerUser)
printStats(pageviewsPerSession)
printStats(sessionLength)
@ottomata
Copy link
Author

    count:                9
    geoMean:              NaN
    mean:                 676000.0
    min:                  21000
    max:                  1621000
    .25 quantile bounds   (97000.0,97001.0)
    .50 quantile bounds   (186000.0,186001.0)
    .75 quantile bounds   (941000.0,941001.0)

This could be a worry:
twitter/algebird#377

But, I really have no idea what I'm doing here. This is my attempt to use QTree to compute quantiles in spark. I put this together from these examples:

http://twitter.github.io/algebird/index.html#com.twitter.algebird.QTree
https://github.com/twitter/algebird/wiki/Learning-Algebird-Monoids-with-REPL#qtree
https://gist.github.com/MLnick/4945185

I'm pretty sure this is a probabilistic quantile, so it is not exact. Iunnooo?

Check this out too:
http://skipperkongen.dk/2014/08/13/twitter-monoids-in-spark/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment