Skip to content

Instantly share code, notes, and snippets.

@jmilagroso
Created April 21, 2017 02:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jmilagroso/9dee75bfaaf80ee27a7e53dc47699601 to your computer and use it in GitHub Desktop.
Save jmilagroso/9dee75bfaaf80ee27a7e53dc47699601 to your computer and use it in GitHub Desktop.
Dynamic mapping of Analytics metrics to get data from HBase written in Finatra Framework.
package com.cloud4wi.finatra.controller
import org.cloud4wi.finagle.{WebServerDriverManager, WebServerQueryBuilder, WebServerQueryBuilderService, WebServerLevelHandler, WebServerConfig}
import org.joda.time.DateTime
import org.joda.time.format.DateTimeFormat
import scala.collection.immutable.Map
import scala.collection.mutable.ListBuffer
import com.typesafe.config.ConfigFactory
/**
* Created by jmilagroso on 2/10/17.
*/
class MetricsHandler() {
val config = new WebServerConfig
val fieldMaps = new FieldMapper
def process(metric: String, parameters: scala.collection.mutable.Map[String, String]):
ListBuffer[Map[String,String]] = {
val dt = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss")
val breakdown = parameters.getOrElse("breakdown" ,"")
val granularity = parameters.getOrElse("granularity" ,"hourly")
val tenantId = parameters.getOrElse("tenantId" ,"")
val venueId = parameters.getOrElse("venueId" ,"")
val accessPointId = parameters.getOrElse("accessPointId" ,"")
val startDate = parameters.getOrElse("startDate" ,dt.print(new DateTime)).toString()
val endDate = parameters.getOrElse("endDate" ,dt.print(new DateTime)).toString()
val timezone = parameters.getOrElse("timezone" ,config.conf.getString("defaultTimeZone")).toString()
val statistics = parameters.getOrElse("statistics" , "0");
// Driver Manager
val driverManager = new WebServerDriverManager()
// Query Builder
val queryBuilder = new WebServerQueryBuilder()
var sql:String = new String
if(statistics=="1") {
queryBuilder.raw(new Statistics().sql(metric, parameters))
} else {
if(metric=="max_concurrent_connection") {
queryBuilder.raw(new MetricMaxConcurrentConnection().sql(parameters))
} else {
val levelHandler = new WebServerLevelHandler()
var (fields, groupBy, aggregation) = levelHandler.handle(breakdown, granularity, timezone)
if(fields.length==1) {
fields = fields.substring(0, fields.length-1)
}
groupBy = if(groupBy.length > 0) groupBy.substring(0, groupBy.length-1) else groupBy
if(config.conf.getBoolean("debug")) println("fields:"+fields)
if(config.conf.getBoolean("debug")) println("groupBy:"+groupBy)
fields = buildFields(fields, metric, aggregation)
var whereTenantId: String = if(!tenantId.isEmpty()) " AND TENANT_ID = " + tenantId else ""
var whereVenueId: String = if(!venueId.isEmpty()) " AND VENUE_ID = " + venueId else ""
var whereAccessPointId: String = if(!accessPointId.isEmpty()) " AND ACCESS_POINT_ID = " + accessPointId else ""
var whereDimBr: String = ""
if(breakdown.contains("ACCESS_POINT_ID")) {
whereDimBr = " AND DIM_BR = 1 "
} else if(breakdown.contains("VENUE_ID")) {
whereDimBr = " AND DIM_BR = 2 "
} else if(breakdown.contains("TENANT_ID")) {
whereDimBr = " AND DIM_BR = 3 "
} else {
whereDimBr = " AND DIM_BR = 0 "
}
queryBuilder.select(fields)
.from("MINERVA")
.where("TIMESTAMP > CONVERT_TZ(TO_TIME('"+startDate+"', '', '"+timezone+"'), '"+timezone+"', 'UTC') AND " +
"TIMESTAMP < CONVERT_TZ(TO_TIME('"+endDate+"', '', '"+timezone+"'), '"+timezone+"', 'UTC') " +
whereTenantId + whereVenueId + whereAccessPointId
+ whereDimBr
)
.groupBy(groupBy)
.query()
}
}
if(config.conf.getBoolean("debug")) println(queryBuilder.sql)
// Service
val webServerQueryBuilderService = new WebServerQueryBuilderService(driverManager, queryBuilder)
webServerQueryBuilderService.get() // List Buffer
}
def buildFields(fields: String, metric: String, aggregation: Boolean): String = {
// HLL
val hll = if(aggregation) "HLL_AGGREGATE" else "HLL_COUNT"
// TOP K function
val topk = if(aggregation) "TOPK_AGGREGATE" else "TOPK_GET"
// SUM function
val sum = if(aggregation) "SUM" else ""
// TGEST function
val tgest = if(aggregation) "TDIGEST_AGGREGATE" else "TDIGEST_GET"
// Local fields representation
var f = fields
// Maps
val mapListMetrics = fieldMaps.listMetrics.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapGenderMetrics = fieldMaps.listDemographicGender.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapAgeMetrics = fieldMaps.listDemographicAge.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapImpressionMetrics = fieldMaps.listImpressions.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapTrafficDataMetrics = fieldMaps.listTrafficDataConsumption.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapTrafficConnectionMetrics = fieldMaps.listTrafficConnection.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapBehaviourFreqMetrics = fieldMaps.listBehaviorFrequency.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapBehaviourRecencyMetrics = fieldMaps.listBehaviorRecency.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapConnectionDurationMetrics = fieldMaps.listConnectionDuration.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapUserAnalyticsMetrics = fieldMaps.listUserAnalytics.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapPresenceDurationMetrics = fieldMaps.listPresenceDuration.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapPresenceFrequencyMetrics = fieldMaps.listPresenceFrequency.grouped(2).collect { case List(k, v) => k -> v }.toMap
val mapSocialMetrics = fieldMaps.listSocialAnalytics.grouped(2).collect { case List(k, v) => k -> v }.toMap
// Comma-delimited metrics
var tmp = metric.split(",") // unique_users, unique_sessions, etc
try {
tmp.foreach((str: String) => {
// listMetrics
if(mapListMetrics.contains(str)) {
if(str=="signup_type") {
// || any other metrics that uses topk otherwise create a set.
f = f.concat(topk + "(" + mapListMetrics.get(str).get + ") as " + str + ",")
} else if(str=="returning_users") {
f = f.concat(hll + "(IDS_HLL) - " + hll + "(NEW_USERS_HLL) AS " + str + ",")
} else if(str=="unique_presence_passerbys_c" || str=="presence_attraction_rate") {
f = f.concat("("+mapListMetrics.get(str).get+") as "+str+",")
} else {
f = f.concat(hll+"("+mapListMetrics.get(str).get+") as "+str+",")
}
} else {
str match {
// Demographics Gender
case "demographics_gender" => {
mapGenderMetrics.foreach((e: (String, String)) => {
f = f.concat(hll+"("+e._2+") as "+e._1+",")
})
}
// Demographics Age
case "demographics_age" => {
mapAgeMetrics.foreach((e: (String, String)) => {
f = f.concat(hll+"("+e._2+") as "+e._1+",")
})
}
// Impressions
case "impressions" => {
mapImpressionMetrics.foreach((e: (String, String)) => {
if(e._1=="impression_count") {
f = f.concat(sum+"("+e._2+") as "+e._1+",")
} else {
f = f.concat(hll+"("+e._2+") as "+e._1+",")
}
})
}
// Traffic Data Consumption
case "traffic_data_consumption" => {
mapTrafficDataMetrics.foreach((e: (String, String)) => {
if(e._1=="data_consumed_median") {
f = f.concat(tgest + "(" + e._2 + ") as " + e._1 + ",")
} else if(e._1=="data_consumed_avg_user"){
f = f.concat("("+e._2+") as DATA_CONSUMED_AVG_USER,")
} else {
f = f.concat(sum+"("+e._2+") as "+e._1+",")
}
})
}
// Traffic Connection Time
case "traffic_connection_time" => {
mapTrafficConnectionMetrics.foreach((e: (String, String)) => {
if(e._1=="data_connection_median") {
f = f.concat(tgest+"("+e._2+") as "+e._1+",")
} else if (e._1=="data_connection_avg_user") {
f = f.concat("("+e._2+") as DATA_CONNECTION_AVG_USER,")
} else {
f = f.concat(sum+"("+e._2+") as "+e._1+",")
}
})
}
// Behaviour Frequency
case "behavior_frequency" => {
mapBehaviourFreqMetrics.foreach((e: (String, String)) => {
f = f.concat(hll+"("+e._2+") as "+e._1+",")
})
}
// Behaviour Recency
case "behavior_recency" => {
mapBehaviourRecencyMetrics.foreach((e: (String, String)) => {
f = f.concat(hll+"("+e._2+") as "+e._1+",")
})
}
// Connection Duration
case "connection_duration" => {
mapConnectionDurationMetrics.foreach((e: (String, String)) => {
f = f.concat(hll+"("+e._2+") as "+e._1+",")
})
}
// User Analytics
case "user_analytics" => {
mapUserAnalyticsMetrics.foreach((e: (String, String)) => {
f = f.concat(topk+"("+e._2+") as "+e._1+",")
})
}
// Presence Duration
case "presence_duration" => {
mapPresenceDurationMetrics.foreach((e: (String, String)) => {
if(e._1=="p_duration_avg") {
f = f.concat("("+e._2+") as "+e._1+",")
} else {
f = f.concat(hll+"("+e._2+") as "+e._1+",")
}
})
}
// Presence Frequency
case "presence_frequency" => {
mapPresenceFrequencyMetrics.foreach((e: (String, String)) => {
f = f.concat(hll+"("+e._2+") as "+e._1+",")
})
}
// Social Analytics
case "social_analytics" => {
mapSocialMetrics.foreach((e: (String, String)) => {
f = f.concat(hll+"("+e._2+") as "+e._1+",")
})
}
}
}
})
} catch {
case e: Exception => if(config.conf.getBoolean("debug")) println(e.toString())
}
f = f.substring(0, f.length-1)
// Return updated fields
f.toString()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment