Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@nuria
Created October 2, 2020 20:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nuria/499fe3b4373a9ae640ada5a11f82cb41 to your computer and use it in GitHub Desktop.
Save nuria/499fe3b4373a9ae640ada5a11f82cb41 to your computer and use it in GitHub Desktop.
sessions.scala
// spark-shell --jars /home/otto/algebird-core_2.10-0.9.0.jar,/home/mforns/refinery-core-0.0.9.jar
import java.util.Date
import java.text.SimpleDateFormat
import org.wikimedia.analytics.refinery.core.PageviewDefinition
import org.wikimedia.analytics.refinery.core.Webrequest
import scala.math.pow
import org.apache.spark.rdd.RDD
import com.twitter.algebird.QTree
import com.twitter.algebird.QTreeSemigroup
// helper methods
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
def ms (s:String) : Long = { dateFormat.parse(s).getTime() }
def s (a:Any) : String = { a.asInstanceOf[String] }
// statistical methods
def geoMean (nums:RDD[Long]) : Double = {
pow(nums.fold(1)(_ * _), 1.0 / nums.count)
}
def printStats (nums:RDD[Long]) = {
// approximate/probabilistic quantiles?
val qtSemigroup = new QTreeSemigroup[Long](4)
val sum = nums.map { QTree(_) }.reduce(qtSemigroup.plus)
println(""""
count: %s
geoMean: %s
mean: %s
min: %s
max: %s
.25 quantile bounds %s
.50 quantile bounds %s
.75 quantile bounds %s
""".format(
nums.count,
geoMean(nums),
nums.mean,
nums.min,
nums.max,
sum.quantileBounds(.25),
sum.quantileBounds(.5),
sum.quantileBounds(.75)
))
}
/**
* Empty list of sessions
* To be used as zero value for the sessionize function.
*/
val emptySessions = List[List[Long]]()
/**
* Session logic
*
* @param sessions List of sessions. Each session is represented
* as an ordered list of pageview timestamps.
* @param timestamp The pageview timestamp to be merged to the
* session list. It is assumed to be greater than
* all the previous timestamps in the session list.
*
* @return The list of sessions including the new pageview timestamp.
* Depending on the time passed since last pageview,
* The timestamp will be allocated as part of the last session
* or in a new session.
*/
def sessionize (sessions:List[List[Long]], timestamp:Long) : List[List[Long]] = {
if (sessions.length == 0) List(List(timestamp))
else {
if (timestamp <= sessions.last.last + 1800000) {
sessions.init :+ (sessions.last :+ timestamp)
} else sessions :+ List(timestamp)
}
}
// setup sql engine
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
sqlContext.parquetFile(
"/wmf/data/wmf/webrequest/webrequest_source=mobile/year=2015/month=3/day=20/hour=0/000000_0"
).registerTempTable("webrequest")
// compute sessions by user
val userSessions = sqlContext.
// get webrequest data
sql("""SELECT uri_path, uri_query, content_type, user_agent, x_analytics, dt
FROM webrequest WHERE is_pageview = TRUE""").
// filter app pageviews
filter(r => PageviewDefinition.getInstance.isAppPageview(s(r(0)), s(r(1)), s(r(2)), s(r(3)))).
// map: pageview -> (uuid, timestamp)
map(pv => (Webrequest.getInstance.getXAnalyticsValue(s(pv(4)), "uuid"), ms(s(pv(5))))).
// aggregate: (uuid, timestamp)* -> (uuid, List(ts1, ts2, ts3, ...))
combineByKey(
List(_),
(l:List[Long], t:Long) => l :+ t,
(l1:List[Long], l2:List[Long]) => l1 ::: l2
).
// sample sessions to 10%
sample(false, 0.1).
// map: (uuid, List(ts1, ts2, ts3, ...)) -> (uuid, List(List(ts1, ts2), List(ts3), ...)
map(p => (p._1, p._2.sorted.foldLeft(emptySessions)(sessionize)))
// flatten: (uuid, List(session1, session2, ...) -> session*
val sessions = userSessions.flatMap(_._2)
// metrics
val sessionsPerUser = userSessions.map(r => { r._2.length.asInstanceOf[Long] })
val pageviewsPerSession = sessions.map(r => { r.length.asInstanceOf[Long] })
val sessionLength = sessions.filter(r => { r.length > 1 }).map(r => { r.last - r(0) })
// output
printStats(sessionsPerUser)
printStats(pageviewsPerSession)
printStats(sessionLength)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment