Last active January 22, 2018 00:38
AppSessions That Runs On Spark Shell. See issue with quantiles:
import com.github.nscala_time.time.Imports.{LocalDate, Period}
import com.twitter.algebird.{QTree, QTreeSemigroup}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import scopt.OptionParser
import scala.collection.immutable.HashMap
* This job computes the following session-related metrics for app pageviews:
* Number of sessions per user
* Number of pageviews per session
* Session length (gap between first and last pageview, in milliseconds)
* For each metric, the following stats are computed:
* Minima
* Maxima
* Quantiles List(.1, .5, .9, .99)
* Usage with spark-submit
* spark-submit \
* --class
* /path/to/refinery-job.jar
* -o <output-dir> -y <year> -m <month> -d <day> [-p <period-days> -w <webrequest-base-path> -n <num-partitions>]
* spark-submit \
* --class
* --num-executors=16 --executor-cores=1 --executor-memory=2g
* /path/to/refinery-job.jar -h hdfs://analytics-hadoop -o /tmp/mobile-apps-sessions
* -y 2015 -m 3 -d 30 [-p 30 -n 8]
* The metrics are stored in output_directory/session_metrics.tsv, and also exposed through a
* hive external table at wmf.mobile_apps_session_metrics
* Empty list of sessions
* To be used as zero value for the sessionize function.
val emptySessions = List.empty[List[Long]]
* List of os families to breakdown.
val osFamilies: List[String] = List("Android", "iOS")
* Quantiles
* @param nums RDD of values.
* @param qs List of q values to calculate quantiles for.
* @return A list containing the quantile (lowBound, highBound) for each q.
def quantiles(nums: RDD[Long], qs: List[Double]): List[(Double, Double)] = {
val qtSemigroup = new QTreeSemigroup[Long](8)
val sum =
* Session logic
* @param sessions List of sessions. Each session is represented
* as an ordered list of pageview timestamps.
* Like: List(List(ts1, ts2), List(ts3, ts4), List(ts5))
* This would be empty on 1st pass for a uuid
* @param timestamp The pageview timestamp to be merged to the
* session list. It is assumed to be greater than
* all the previous timestamps in the session list.
* So the list we are folding needs to be order max to min
* If gap among two pageviews is bigger than 30 mins (1800 secs)
is a new session
* @return The list of sessions including the new pageview timestamp.
* Depending on the time passed since last pageview,
* The timestamp will be allocated as part of the last session
* or in a new session.
def sessionize(sessions: List[List[Long]], timestamp: Long): List[List[Long]] = {
val sessionGap = 1800
if (sessions.isEmpty) List(List(timestamp))
else {
if (timestamp <= sessions.last.last + sessionGap) {
sessions.init :+ (sessions.last :+ timestamp)
} else sessions :+ List(timestamp)
* Computes statistics for a metric
* @param nums RDD of values.
* @return Hashmap of different stats computed for a given metric
def statsPerMetric(nums: RDD[Long]) = {
val qs = List(.1, .5, .9, .99)
val percentiles = quantiles(nums, qs)
("count", nums.count),
( "min" , nums.min),
("max" , nums.max),
("percentile_1" , percentiles(0)),
("percentile_50" , percentiles(1)),
("percentile_90" , percentiles(2)),
("percentile_99" , percentiles(3))
* Compute stats for the different session metrics given all user sessions data
* @param userSessions UserSessions is of this form:
* [((String, String), List[List[Long]])] = (
* ((Android, 3e92cbf4-d204-4a59-a12d-dde6e337902d), List(List(1426809630), List(1426812577))),
* ((iOS, bf78563d-9173-415c-ba4f-6848ab46afc3), List(List(1426810256, 1426810264))),
* ((Android, 896c399c-0dc1-4835-aa66-6d619b83a76b), List(List(1426810619, 1426811555))),
* ((iOS, 53d91d28-158f-4dcf-8561-5a3d6e90e18e), List(List(1426810894, 1426810915)))
* )
* @param osFilter [Optional] If present, it will consider only the uuids
* that belong to the given os_family. It will also output
* this parameter as the second value of the output's tuples.
* If not present (default), it will consider all uuids and
* output null as second value in the output's tuples.
* @return List of tuples in the form of [(SessionMetricName, osFamily, SessionMetricStats), ... ]
def allSessionMetricsStats(
userSessions: RDD[((String, String), List[List[Long]])],
osFilter: String = null
): List[(String, String, Map[String, Any])] = {
// Filter the wmfuuids that belong to the given os family, if necessary.
val filteredSessions = if (osFilter == null) {
} else {
userSessions.filter(_._1._1 == osFilter)
// calculate number of sessions per user
val sessionsPerUser = statsPerMetric( => r._2.length.toLong))
// flatten: (key, List(session1, session2, ...) -> session*
// RDD[List[Long]]] =
// List(1426810256, 1426810264),
// List(1426811100), List(1426810253)
// List(1426810619, 1426811555), List(1426810894, 1426810915),
val sessions = filteredSessions.flatMap(_._2)
// calculate number of pageviews per session
val pageviewsPerSession = statsPerMetric( => r.length.toLong))
// calculate session length
// sessions with only one pageview are not counted
val sessionLength = statsPerMetric(sessions
//Remove keys with just 1 timestamp
.filter(s => !s.isEmpty && !s.tail.isEmpty)
.map(r => r.last - r.head))
("SessionsPerUser", osFilter, sessionsPerUser),
("PageviewsPerSession", osFilter, pageviewsPerSession),
("SessionLength", osFilter, sessionLength)
* Given stats map returns stats ready to be printed
* @return A tab separated string for given metric, looks like
* 2015 6 2 2015-6-1 -- 2015-6-1 SessionsPerUser 1259304 1 15 (1.0,2.0) (1.0,2.0) (2.0,3.0) (5.0,6.0)
* If the data is broken down by os family, adds a corresponing column at the end.
def statsToString(
stats: Map[String, Any],
statsType: String,
osFamily: String
): String = {
val osString = if (osFamily == null) "" else "\t" + osFamily
val outputStats = "%d\t%d\t%d\t%s\t%s%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s"
.format("some", "some", "some", "some",
statsType, osString, stats.apply("count"), stats.apply("min"), stats.apply("max"),
stats.apply("percentile_1"), stats.apply("percentile_50"), stats.apply("percentile_90"),
* Makes a printable string with all the stats
* @param sessionStatsData List of tuples in form of
* [(SessionMetricName, osFamily, SessionMetricStats), ...]
* @return String with 1 line for every metric, separated by newline
def printableStats(
sessionStatsData: List[(String, String, Map[String, Any])]
): String = { => statsToString(
* Fetch select columns from given list of Parquet files
* @param paths List of parquet file paths to load,
* Like: ["hdfs://../year=2015/month=5/day=5",
* "hdfs://../year=2015/month=5/day=6",...]
* @param sqlContext SQL Context
* @return DataFrame with 3 columns: os_family, wmfuuid and timestamp
def pathListToDataframe(paths: List[String], sqlContext: SQLContext): DataFrame = { _*)
and access_method = 'mobile app'
and x_analytics_map['wmfuuid'] is not null
and x_analytics_map['wmfuuid'] != ''
and ts is not null
"user_agent_map['os_family'] as os_family",
"x_analytics_map['wmfuuid'] as wmfuuid",
"CAST(ts AS int) as ts"
* Compute sessions by user
* @param userSessionsAll DataFrame with os_family, wmfuuid and timestamp colums
* @param numPartitions Number of partitions for the output RDD
* @return userSessions RDD in the form of
* ((os_family, wmfuuid), List(session1, session2, ...))
def userSessions(userSessionsAll: DataFrame, numPartitions: Int): RDD[((String, String), List[List[Long]])] = {
// format the line as: key=(os_family, wmfuuid), val=timestamp
.map(r => ((r.getString(0), r.getString(1)), r.getInt(2).toLong))
// aggregate key to list of sorted timestamps (key, timestamp)* -> (key, List(ts1, ts2, ts3, ...))
(t: Long) => List(t),
(l: List[Long], t: Long) => (t +: l),
// sort timestamps max to min
(l1: List[Long], l2: List[Long]) => (l1 ++ l2),
numPartitions = numPartitions)
// map: (key, List(ts1, ts2, ts3, ...)) -> sort(List(ts1, ts2, ts3, ...)) -> (key, List(List(ts1, ts2), List(ts3), ...)
.map { case (key, listOfTimestampLists) => key -> listOfTimestampLists.sorted.foldLeft(emptySessions)(sessionize) }
* Save output stats to HDFS
def saveStats(sc: SparkContext, outputFile: String, currentStats: String, reportDateRange: String) = {
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val hdfs = new Path(outputFile).getFileSystem(hadoopConf)
// Convert current stats to string
val path = new Path(outputFile)
if (hdfs.exists(path)) {
// Read data file and delete records from this date range if any
// and convert the filtered data into a string.
val pastStats = sc.textFile(outputFile)
.filter(record => !record.contains(reportDateRange))
// Concatenate with current output stats
val currentAndPastData = "%s\n%s".format(pastStats, currentStats)
// 3. delete old file in hadoop
try {
hdfs.delete(path, true)
} catch {
case _: Throwable => {}
// Save the combined data
} else {
//Save only current data
* Config class for CLI argument parser using scopt
// Initial setup - Spark, SQLContext
//val conf = new SparkConf().setAppName("AppSessionMetrics")
val conf = new SparkConf().setAppName("AppSessionMetrics")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryoserializer.buffer.mb", "24")
// sc is predefined in the shell
//val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")
// Generate a list of all parquet file paths to read given the webrequest base path,
// and all dates related information. NOTE: As of January 2016,
// mobile web caches have been merged with text, so webrequest_source=text.
//val webrequestTextPath = "hdfs://analytics-hadoop/wmf/data/wmf/webrequest/webrequest_source=text"
// Helper hashmap with all date related information to avoid passing around lots of params
//val datesInfo = HashMap("year" -> 2018, "month" -> 1, "day" -> 9, "periodDays" -> 2)
// List of path strings like [".../day=1", ".../day=2"]
//val webrequestPaths = dateRangeToPathList("", datesInfo)
//val webrequestPaths = List("hdfs://analytics-hadoop/wmf/data/wmf/webrequest/webrequest_source=text/year=2018/month=1/day=9/")
// Get sessions data for all users, calculate stats for different metrics,
// and get the stats in a printable string format to output
val data ="hdfs://analytics-hadoop/wmf/data/wmf/webrequest/webrequest_source=text/year=2018/month=1/day=9/hour=1").
and access_method = 'mobile app'
and x_analytics_map['wmfuuid'] is not null
and x_analytics_map['wmfuuid'] != ''
and ts is not null
"user_agent_map['os_family'] as os_family",
"x_analytics_map['wmfuuid'] as wmfuuid",
"CAST(ts AS int) as ts"
val userSessionsData = userSessions(data, 8).cache()
// update per OS
val allMetricsStats =, _)).fold(List.empty)(_++_)
val outputStats = printableStats(allMetricsStats)
//Save output to file
val outputFile = "/users/nuria/app-sessions/session_metrics.tsv"
saveStats(sc, outputFile, outputStats, "2018-01-09")
