Created
April 21, 2017 02:11
-
-
Save jmilagroso/9dee75bfaaf80ee27a7e53dc47699601 to your computer and use it in GitHub Desktop.
Dynamic mapping of Analytics metrics to get data from HBase written in Finatra Framework.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.cloud4wi.finatra.controller | |
import org.cloud4wi.finagle.{WebServerDriverManager, WebServerQueryBuilder, WebServerQueryBuilderService, WebServerLevelHandler, WebServerConfig} | |
import org.joda.time.DateTime | |
import org.joda.time.format.DateTimeFormat | |
import scala.collection.immutable.Map | |
import scala.collection.mutable.ListBuffer | |
import com.typesafe.config.ConfigFactory | |
/** | |
* Created by jmilagroso on 2/10/17. | |
*/ | |
class MetricsHandler() { | |
val config = new WebServerConfig | |
val fieldMaps = new FieldMapper | |
def process(metric: String, parameters: scala.collection.mutable.Map[String, String]): | |
ListBuffer[Map[String,String]] = { | |
val dt = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss") | |
val breakdown = parameters.getOrElse("breakdown" ,"") | |
val granularity = parameters.getOrElse("granularity" ,"hourly") | |
val tenantId = parameters.getOrElse("tenantId" ,"") | |
val venueId = parameters.getOrElse("venueId" ,"") | |
val accessPointId = parameters.getOrElse("accessPointId" ,"") | |
val startDate = parameters.getOrElse("startDate" ,dt.print(new DateTime)).toString() | |
val endDate = parameters.getOrElse("endDate" ,dt.print(new DateTime)).toString() | |
val timezone = parameters.getOrElse("timezone" ,config.conf.getString("defaultTimeZone")).toString() | |
val statistics = parameters.getOrElse("statistics" , "0"); | |
// Driver Manager | |
val driverManager = new WebServerDriverManager() | |
// Query Builder | |
val queryBuilder = new WebServerQueryBuilder() | |
var sql:String = new String | |
if(statistics=="1") { | |
queryBuilder.raw(new Statistics().sql(metric, parameters)) | |
} else { | |
if(metric=="max_concurrent_connection") { | |
queryBuilder.raw(new MetricMaxConcurrentConnection().sql(parameters)) | |
} else { | |
val levelHandler = new WebServerLevelHandler() | |
var (fields, groupBy, aggregation) = levelHandler.handle(breakdown, granularity, timezone) | |
if(fields.length==1) { | |
fields = fields.substring(0, fields.length-1) | |
} | |
groupBy = if(groupBy.length > 0) groupBy.substring(0, groupBy.length-1) else groupBy | |
if(config.conf.getBoolean("debug")) println("fields:"+fields) | |
if(config.conf.getBoolean("debug")) println("groupBy:"+groupBy) | |
fields = buildFields(fields, metric, aggregation) | |
var whereTenantId: String = if(!tenantId.isEmpty()) " AND TENANT_ID = " + tenantId else "" | |
var whereVenueId: String = if(!venueId.isEmpty()) " AND VENUE_ID = " + venueId else "" | |
var whereAccessPointId: String = if(!accessPointId.isEmpty()) " AND ACCESS_POINT_ID = " + accessPointId else "" | |
var whereDimBr: String = "" | |
if(breakdown.contains("ACCESS_POINT_ID")) { | |
whereDimBr = " AND DIM_BR = 1 " | |
} else if(breakdown.contains("VENUE_ID")) { | |
whereDimBr = " AND DIM_BR = 2 " | |
} else if(breakdown.contains("TENANT_ID")) { | |
whereDimBr = " AND DIM_BR = 3 " | |
} else { | |
whereDimBr = " AND DIM_BR = 0 " | |
} | |
queryBuilder.select(fields) | |
.from("MINERVA") | |
.where("TIMESTAMP > CONVERT_TZ(TO_TIME('"+startDate+"', '', '"+timezone+"'), '"+timezone+"', 'UTC') AND " + | |
"TIMESTAMP < CONVERT_TZ(TO_TIME('"+endDate+"', '', '"+timezone+"'), '"+timezone+"', 'UTC') " + | |
whereTenantId + whereVenueId + whereAccessPointId | |
+ whereDimBr | |
) | |
.groupBy(groupBy) | |
.query() | |
} | |
} | |
if(config.conf.getBoolean("debug")) println(queryBuilder.sql) | |
// Service | |
val webServerQueryBuilderService = new WebServerQueryBuilderService(driverManager, queryBuilder) | |
webServerQueryBuilderService.get() // List Buffer | |
} | |
def buildFields(fields: String, metric: String, aggregation: Boolean): String = { | |
// HLL | |
val hll = if(aggregation) "HLL_AGGREGATE" else "HLL_COUNT" | |
// TOP K function | |
val topk = if(aggregation) "TOPK_AGGREGATE" else "TOPK_GET" | |
// SUM function | |
val sum = if(aggregation) "SUM" else "" | |
// TGEST function | |
val tgest = if(aggregation) "TDIGEST_AGGREGATE" else "TDIGEST_GET" | |
// Local fields representation | |
var f = fields | |
// Maps | |
val mapListMetrics = fieldMaps.listMetrics.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapGenderMetrics = fieldMaps.listDemographicGender.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapAgeMetrics = fieldMaps.listDemographicAge.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapImpressionMetrics = fieldMaps.listImpressions.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapTrafficDataMetrics = fieldMaps.listTrafficDataConsumption.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapTrafficConnectionMetrics = fieldMaps.listTrafficConnection.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapBehaviourFreqMetrics = fieldMaps.listBehaviorFrequency.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapBehaviourRecencyMetrics = fieldMaps.listBehaviorRecency.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapConnectionDurationMetrics = fieldMaps.listConnectionDuration.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapUserAnalyticsMetrics = fieldMaps.listUserAnalytics.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapPresenceDurationMetrics = fieldMaps.listPresenceDuration.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapPresenceFrequencyMetrics = fieldMaps.listPresenceFrequency.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
val mapSocialMetrics = fieldMaps.listSocialAnalytics.grouped(2).collect { case List(k, v) => k -> v }.toMap | |
// Comma-delimited metrics | |
var tmp = metric.split(",") // unique_users, unique_sessions, etc | |
try { | |
tmp.foreach((str: String) => { | |
// listMetrics | |
if(mapListMetrics.contains(str)) { | |
if(str=="signup_type") { | |
// || any other metrics that uses topk otherwise create a set. | |
f = f.concat(topk + "(" + mapListMetrics.get(str).get + ") as " + str + ",") | |
} else if(str=="returning_users") { | |
f = f.concat(hll + "(IDS_HLL) - " + hll + "(NEW_USERS_HLL) AS " + str + ",") | |
} else if(str=="unique_presence_passerbys_c" || str=="presence_attraction_rate") { | |
f = f.concat("("+mapListMetrics.get(str).get+") as "+str+",") | |
} else { | |
f = f.concat(hll+"("+mapListMetrics.get(str).get+") as "+str+",") | |
} | |
} else { | |
str match { | |
// Demographics Gender | |
case "demographics_gender" => { | |
mapGenderMetrics.foreach((e: (String, String)) => { | |
f = f.concat(hll+"("+e._2+") as "+e._1+",") | |
}) | |
} | |
// Demographics Age | |
case "demographics_age" => { | |
mapAgeMetrics.foreach((e: (String, String)) => { | |
f = f.concat(hll+"("+e._2+") as "+e._1+",") | |
}) | |
} | |
// Impressions | |
case "impressions" => { | |
mapImpressionMetrics.foreach((e: (String, String)) => { | |
if(e._1=="impression_count") { | |
f = f.concat(sum+"("+e._2+") as "+e._1+",") | |
} else { | |
f = f.concat(hll+"("+e._2+") as "+e._1+",") | |
} | |
}) | |
} | |
// Traffic Data Consumption | |
case "traffic_data_consumption" => { | |
mapTrafficDataMetrics.foreach((e: (String, String)) => { | |
if(e._1=="data_consumed_median") { | |
f = f.concat(tgest + "(" + e._2 + ") as " + e._1 + ",") | |
} else if(e._1=="data_consumed_avg_user"){ | |
f = f.concat("("+e._2+") as DATA_CONSUMED_AVG_USER,") | |
} else { | |
f = f.concat(sum+"("+e._2+") as "+e._1+",") | |
} | |
}) | |
} | |
// Traffic Connection Time | |
case "traffic_connection_time" => { | |
mapTrafficConnectionMetrics.foreach((e: (String, String)) => { | |
if(e._1=="data_connection_median") { | |
f = f.concat(tgest+"("+e._2+") as "+e._1+",") | |
} else if (e._1=="data_connection_avg_user") { | |
f = f.concat("("+e._2+") as DATA_CONNECTION_AVG_USER,") | |
} else { | |
f = f.concat(sum+"("+e._2+") as "+e._1+",") | |
} | |
}) | |
} | |
// Behaviour Frequency | |
case "behavior_frequency" => { | |
mapBehaviourFreqMetrics.foreach((e: (String, String)) => { | |
f = f.concat(hll+"("+e._2+") as "+e._1+",") | |
}) | |
} | |
// Behaviour Recency | |
case "behavior_recency" => { | |
mapBehaviourRecencyMetrics.foreach((e: (String, String)) => { | |
f = f.concat(hll+"("+e._2+") as "+e._1+",") | |
}) | |
} | |
// Connection Duration | |
case "connection_duration" => { | |
mapConnectionDurationMetrics.foreach((e: (String, String)) => { | |
f = f.concat(hll+"("+e._2+") as "+e._1+",") | |
}) | |
} | |
// User Analytics | |
case "user_analytics" => { | |
mapUserAnalyticsMetrics.foreach((e: (String, String)) => { | |
f = f.concat(topk+"("+e._2+") as "+e._1+",") | |
}) | |
} | |
// Presence Duration | |
case "presence_duration" => { | |
mapPresenceDurationMetrics.foreach((e: (String, String)) => { | |
if(e._1=="p_duration_avg") { | |
f = f.concat("("+e._2+") as "+e._1+",") | |
} else { | |
f = f.concat(hll+"("+e._2+") as "+e._1+",") | |
} | |
}) | |
} | |
// Presence Frequency | |
case "presence_frequency" => { | |
mapPresenceFrequencyMetrics.foreach((e: (String, String)) => { | |
f = f.concat(hll+"("+e._2+") as "+e._1+",") | |
}) | |
} | |
// Social Analytics | |
case "social_analytics" => { | |
mapSocialMetrics.foreach((e: (String, String)) => { | |
f = f.concat(hll+"("+e._2+") as "+e._1+",") | |
}) | |
} | |
} | |
} | |
}) | |
} catch { | |
case e: Exception => if(config.conf.getBoolean("debug")) println(e.toString()) | |
} | |
f = f.substring(0, f.length-1) | |
// Return updated fields | |
f.toString() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment