|
import com.lumoslabs.events.common.Event |
|
import org.apache.spark.sql.functions._ |
|
import org.apache.spark.sql.Encoder |
|
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder |
|
import org.apache.spark.sql.Encoders |
|
import org.apache.spark.sql.expressions.Aggregator |
|
import org.apache.spark.sql.Dataset |
|
import org.apache.spark.sql.TypedColumn |
|
import org.apache.spark.sql.functions.udf |
|
import java.time.Instant |
|
import spark.implicits._ |
|
|
|
// UDFs |
|
|
|
val visitType = udf((clientTimestamp: Long, visitTimestamp: Long) => { |
|
if(Option(clientTimestamp).isEmpty || Option(visitTimestamp).isEmpty) { |
|
"unknown" |
|
} else if (((clientTimestamp - visitTimestamp) / 1000) < 150) { |
|
"new" |
|
} else { |
|
"returning" |
|
} |
|
}) |
|
|
|
val isUnregistered = udf((userId: Int, signupUserId: Int) => Option(userId).isEmpty || Option(signupUserId).isDefined) |
|
|
|
// functions |
|
object AdjustReferralExtract extends Serializable { |
|
val AdjustAttributionEvent = "adjust_attribution_received" |
|
private val CreativePattern = "\\A\\w+ .*\\((\\d+)\\)\\z".r |
|
private val IntegerPattern = "\\A(\\d+)\\z".r |
|
private val EmptyPattern = "\\s+".r |
|
|
|
def extract(name: String, network: Option[String], campaign: Option[String]) = { |
|
|
|
(name, network, campaign) match { |
|
case (AdjustAttributionEvent, Some("YouTubeInstalls"), _) => Some("5217") |
|
case (AdjustAttributionEvent, _, Some(CreativePattern(id))) => Some(campaign.get.split(" ")(0)) |
|
case (AdjustAttributionEvent, _, Some(IntegerPattern(plainId))) => Some(plainId) |
|
case (AdjustAttributionEvent, _, Some(EmptyPattern(empty))) => None |
|
case (AdjustAttributionEvent, _, _) => None |
|
case _ => None |
|
} |
|
} : Option[String] |
|
} |
|
|
|
def optionWhen[T](m: String, n: String, o: Option[T]): Option[T] = if(m == n) o else None |
|
def eventExists(m: String, n: String): Int = if(m == n) 1 else 0 |
|
def pageExists(mPage: String, n: String, page: Option[String]): Int = if(n == "page_view" && page.isDefined && page.get == mPage) 1 else 0 |
|
|
|
// case class |
|
case class ProjectedFields( |
|
name: String, |
|
timestamp: Long, |
|
visitId: String, |
|
visitTimestamp: Long, |
|
clientId: String, |
|
clientTimestamp: Long, |
|
userId: Option[Int], |
|
assignmentId: String, |
|
ipAddress: Option[String], |
|
appName: Option[String], |
|
clientPlatform: Option[String], |
|
referralKey: Option[String], |
|
preferredLanguage: Option[String], |
|
signupReferral: Option[String], |
|
signupUserId: Option[Int], |
|
signupTimestamp: Option[Long], |
|
purchasePageView: Int, |
|
purchase: Int, |
|
splashPageView: Int, |
|
introductionPageView: Int, |
|
signupPageView: Int, |
|
adjustReferral: Option[String] |
|
) |
|
|
|
case class VisitFact( |
|
visitId: String, |
|
assignmentId: String, |
|
ipAddress: Option[String], |
|
visitTimestamp: Long, |
|
clientId: String, |
|
clientTimestamp: Long, |
|
userId: Option[Long], |
|
signupUserId: Option[Long], |
|
signupReferral: Option[String], |
|
signupTimestamp: Option[Long], |
|
adjustReferral: Option[String], |
|
appName: String, |
|
clientPlatform: Option[String], |
|
preferredLanguage: Option[String], |
|
purchases: Option[Long], |
|
purchasePageViews: Option[Long], |
|
visitEndedAt: Long, |
|
visitStartedAt: Long, |
|
visitType: String, |
|
isUnregistered: Option[Boolean] |
|
) |
|
|
|
// constants and vals |
|
val IosAppNames = Set("Lumosity Mobile", "Lumosity Mobile iPad") |
|
val BlackListEvents = Set("split_test_assigned", "bulk_email_unsubscribe") |
|
|
|
val GroupByKeys = List( |
|
"visitId", |
|
"assignmentId", |
|
"ipAddress", |
|
"visitTimestamp" |
|
) |
|
|
|
// spark job |
|
//val events = sc.textFile("/mnt/lumos-data-dump-dev01/andy/test/events_small/") |
|
val events = sc.textFile("/mnt/lumos-events/yyyy=2017/mm=03/dd=11") |
|
val projectedFields = events.map(Event(_)) |
|
.filter(event => { |
|
event.isValid && !BlackListEvents.contains(event.name) && IosAppNames.contains(event.appName) && event.visit.id.isDefined && event.visit.timestamp.isDefined && event.client.id.isDefined && event.client.timestamp.isDefined && event.assignmentId.isDefined |
|
}) |
|
.map(event => ProjectedFields( |
|
event.name, |
|
event.header.rawTimestamp.get, |
|
event.visit.id.get, |
|
event.visit.rawTimestamp.get, |
|
event.client.id.get, |
|
event.client.rawTimestamp.get, |
|
event.userId, |
|
event.assignmentId.get, |
|
event.ipAddress, |
|
event.event.property("app_name").asString, |
|
event.event.property("client_platform").asString, |
|
event.event.property("referral_key").asString, |
|
event.preferredLanguage, |
|
optionWhen[String]("sign_up", event.name, event.event.property("referral_key").asString), |
|
optionWhen[Int]("sign_up", event.name, event.userId), |
|
optionWhen[Long]("sign_up", event.name, event.header.rawTimestamp), |
|
eventExists("purchase_page_view", event.name), |
|
eventExists("purchase", event.name), |
|
pageExists("Splash", event.name, event.event.property("page").asString), |
|
pageExists("Introduction", event.name, event.event.property("page").asString), |
|
pageExists("Signup", event.name, event.event.property("page").asString), |
|
AdjustReferralExtract.extract(event.name, event.event.property("network").asString, event.event.property("campaign").asString) |
|
)).toDS |
|
|
|
projectedFields.printSchema() |
|
|
|
projectedFields |
|
.groupBy(GroupByKeys.head, GroupByKeys.tail: _*) |
|
.agg( |
|
first("clientId", ignoreNulls = true).alias("clientId"), |
|
first("clientTimestamp", ignoreNulls = true).alias("clientTimestamp"), |
|
first("userId", ignoreNulls = true).alias("userId"), |
|
first("signupUserId", ignoreNulls = true).alias("signupUserId"), |
|
first("signupReferral", ignoreNulls = true).alias("signupReferral"), |
|
first("signupTimestamp", ignoreNulls = true).alias("signupTimestamp"), |
|
first("adjustReferral", ignoreNulls = true).alias("adjustReferral"), |
|
first("appName", ignoreNulls = true).alias("appName"), |
|
first("clientPlatform", ignoreNulls = true).alias("clientPlatform"), |
|
first("preferredLanguage", ignoreNulls = true).alias("preferredLanguage"), |
|
sum("purchase").alias("purchases"), |
|
sum("purchasePageView").alias("purchasePageViews"), |
|
max("timestamp").alias("visitEndedAt"), |
|
min("timestamp").alias("visitStartedAt") |
|
) |
|
.withColumn("visitType", visitType($"clientTimestamp", $"visitTimestamp")) |
|
.withColumn("isUnregistered", isUnregistered($"userId", $"signupUserId")) |
|
.take(20) |
Some general comments:
I went through the entire list here to mark the columns as keep or not keep, but open to suggestions (also getting the acquisitions team to review):
https://docs.google.com/spreadsheets/d/1Bdudm6sCA-0N7Dg1GD1cv2lgNjam_BtIqrWkfOFC1FU/edit#gid=0