Skip to content

Instantly share code, notes, and snippets.

@dollschasingmen
Last active September 14, 2017 21:26
Show Gist options
  • Save dollschasingmen/b1e0b855b746a5502e55371820984d16 to your computer and use it in GitHub Desktop.
Save dollschasingmen/b1e0b855b746a5502e55371820984d16 to your computer and use it in GitHub Desktop.
revisit visits

CREATE TABLE

CREATE TABLE views.web_visit_facts
(
  visit_id VARCHAR(96) ENCODE LZO,
  assignment_id VARCHAR(64) ENCODE LZO,
  ip_address VARCHAR(96) ENCODE LZO,
  country_code VARCHAR(2) ENCODE LZO,
  dma INTEGER ENCODE DELTA,
  visit_timestamp BIGINT ENCODE DELTA32K,
  ll_source VARCHAR(256) ENCODE LZO,
  ll_channel VARCHAR(256) ENCODE LZO,
  visit_referral_key VARCHAR(64) ENCODE LZO,
  client_id VARCHAR(64) ENCODE LZO,
  client_timestamp BIGINT ENCODE DELTA32K,
  user_id INTEGER ENCODE DELTA,
  signup_user_id INTEGER ENCODE DELTA,
  signup_referral_key VARCHAR(96) ENCODE LZO,
  signup_timestamp BIGINT ENCODE DELTA32K,
  ll_ad_id BIGINT ENCODE DELTA32K,
  creative_id BIGINT ENCODE DELTA32K,
  subaffiliate_id VARCHAR(256) ENCODE LZO,
  is_facebook_signup VARCHAR(16) ENCODE LZO,
  app_name VARCHAR(64) ENCODE LZO,
  client_platform VARCHAR(64) ENCODE LZO,
  preferred_language VARCHAR(2) ENCODE LZO,
  landing_page_id INTEGER ENCODE DELTA,
  landing_experience_id INTEGER ENCODE DELTA,
  landing_experience_set_id INTEGER ENCODE DELTA,
  purchases INTEGER ENCODE DELTA,
  purchase_page_views INTEGER ENCODE DELTA,
  homepage_views INTEGER ENCODE DELTA,
  billing_page_views INTEGER ENCODE DELTA,
  signup_page_views INTEGER ENCODE DELTA,
  visit_ended_timestamp BIGINT ENCODE DELTA32K,
  visit_started_timestamp BIGINT ENCODE DELTA32K,
  visit_type VARCHAR(16) ENCODE LZO,
  is_unregistered BOOLEAN,
  is_funnel_top BOOLEAN,
  is_new_visitor BOOLEAN
)
DISTSTYLE KEY
DISTKEY(user_id)
SORTKEY(visit_timestamp);

grant select on table views.web_visit_facts to GROUP read_only;
CREATE TABLE views.ios_visit_facts
(
  visit_id VARCHAR(96) ENCODE LZO,
  assignment_id VARCHAR(64) ENCODE LZO,
  ip_address VARCHAR(96) ENCODE LZO,
  country_code VARCHAR(2) ENCODE LZO,
  dma INTEGER ENCODE DELTA,
  visit_timestamp BIGINT ENCODE DELTA32K,
  client_id VARCHAR(64) ENCODE LZO,
  client_timestamp BIGINT ENCODE DELTA32K,
  user_id INTEGER ENCODE DELTA,
  signup_user_id INTEGER ENCODE DELTA,
  signup_referral_key VARCHAR(96) ENCODE LZO,
  signup_timestamp BIGINT ENCODE DELTA32K,
  adjust_referral VARCHAR(96) ENCODE LZO,
  adjust_network VARCHAR(256) ENCODE LZO,
  adjust_campaign VARCHAR(256) ENCODE LZO,
  app_name VARCHAR(64) ENCODE LZO,
  client_platform VARCHAR(64) ENCODE LZO,
  preferred_language VARCHAR(2) ENCODE LZO,
  purchases INTEGER ENCODE DELTA,
  purchase_page_views INTEGER ENCODE DELTA,
  splash_page_views INTEGER ENCODE DELTA,
  introduction_page_views INTEGER ENCODE DELTA,
  signup_page_views INTEGER ENCODE DELTA,
  visit_ended_timestamp BIGINT ENCODE DELTA32K,
  visit_started_timestamp BIGINT ENCODE DELTA32K,
  visit_type VARCHAR(16) ENCODE LZO,
  is_unregistered BOOLEAN
)
DISTSTYLE KEY
DISTKEY(user_id)
SORTKEY(visit_timestamp);

grant select on table views.ios_visit_facts to GROUP read_only;

COPY

COPY views.web_visit_facts
FROM 's3://lumos-data/web_visit_facts/tsv/yyyy=2017/mm=05/dd=11'
CREDENTIALS 'aws_access_key_id= ;aws_secret_access_key= '
GZIP DELIMITER '\t' EMPTYASNULL FILLRECORD NULL 'null' TRUNCATECOLUMNS
COPY views.ios_visit_facts
FROM 's3://lumos-data/ios_visit_facts/tsv/yyyy=2017/mm=05/dd=11'
CREDENTIALS 'aws_access_key_id= ;aws_secret_access_key= '
GZIP DELIMITER '\t' EMPTYASNULL FILLRECORD NULL 'null' TRUNCATECOLUMNS
import com.lumoslabs.events.common.Event
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.TypedColumn
import org.apache.spark.sql.functions.udf
import java.time.Instant
import spark.implicits._
// UDFs
val visitType = udf((clientTimestamp: Long, visitTimestamp: Long) => {
if(Option(clientTimestamp).isEmpty || Option(visitTimestamp).isEmpty) {
"unknown"
} else if (((clientTimestamp - visitTimestamp) / 1000) < 150) {
"new"
} else {
"returning"
}
})
val isUnregistered = udf((userId: Int, signupUserId: Int) => Option(userId).isEmpty || Option(signupUserId).isDefined)
// functions
object AdjustReferralExtract extends Serializable {
val AdjustAttributionEvent = "adjust_attribution_received"
private val CreativePattern = "\\A\\w+ .*\\((\\d+)\\)\\z".r
private val IntegerPattern = "\\A(\\d+)\\z".r
private val EmptyPattern = "\\s+".r
def extract(name: String, network: Option[String], campaign: Option[String]) = {
(name, network, campaign) match {
case (AdjustAttributionEvent, Some("YouTubeInstalls"), _) => Some("5217")
case (AdjustAttributionEvent, _, Some(CreativePattern(id))) => Some(campaign.get.split(" ")(0))
case (AdjustAttributionEvent, _, Some(IntegerPattern(plainId))) => Some(plainId)
case (AdjustAttributionEvent, _, Some(EmptyPattern(empty))) => None
case (AdjustAttributionEvent, _, _) => None
case _ => None
}
} : Option[String]
}
def optionWhen[T](m: String, n: String, o: Option[T]): Option[T] = if(m == n) o else None
def eventExists(m: String, n: String): Int = if(m == n) 1 else 0
def pageExists(mPage: String, n: String, page: Option[String]): Int = if(n == "page_view" && page.isDefined && page.get == mPage) 1 else 0
// case class
case class ProjectedFields(
name: String,
timestamp: Long,
visitId: String,
visitTimestamp: Long,
clientId: String,
clientTimestamp: Long,
userId: Option[Int],
assignmentId: String,
ipAddress: Option[String],
appName: Option[String],
clientPlatform: Option[String],
referralKey: Option[String],
preferredLanguage: Option[String],
signupReferral: Option[String],
signupUserId: Option[Int],
signupTimestamp: Option[Long],
purchasePageView: Int,
purchase: Int,
splashPageView: Int,
introductionPageView: Int,
signupPageView: Int,
adjustReferral: Option[String]
)
case class VisitFact(
visitId: String,
assignmentId: String,
ipAddress: Option[String],
visitTimestamp: Long,
clientId: String,
clientTimestamp: Long,
userId: Option[Long],
signupUserId: Option[Long],
signupReferral: Option[String],
signupTimestamp: Option[Long],
adjustReferral: Option[String],
appName: String,
clientPlatform: Option[String],
preferredLanguage: Option[String],
purchases: Option[Long],
purchasePageViews: Option[Long],
visitEndedAt: Long,
visitStartedAt: Long,
visitType: String,
isUnregistered: Option[Boolean]
)
// constants and vals
val IosAppNames = Set("Lumosity Mobile", "Lumosity Mobile iPad")
val BlackListEvents = Set("split_test_assigned", "bulk_email_unsubscribe")
val GroupByKeys = List(
"visitId",
"assignmentId",
"ipAddress",
"visitTimestamp"
)
// spark job
//val events = sc.textFile("/mnt/lumos-data-dump-dev01/andy/test/events_small/")
val events = sc.textFile("/mnt/lumos-events/yyyy=2017/mm=03/dd=11")
val projectedFields = events.map(Event(_))
.filter(event => {
event.isValid && !BlackListEvents.contains(event.name) && IosAppNames.contains(event.appName) && event.visit.id.isDefined && event.visit.timestamp.isDefined && event.client.id.isDefined && event.client.timestamp.isDefined && event.assignmentId.isDefined
})
.map(event => ProjectedFields(
event.name,
event.header.rawTimestamp.get,
event.visit.id.get,
event.visit.rawTimestamp.get,
event.client.id.get,
event.client.rawTimestamp.get,
event.userId,
event.assignmentId.get,
event.ipAddress,
event.event.property("app_name").asString,
event.event.property("client_platform").asString,
event.event.property("referral_key").asString,
event.preferredLanguage,
optionWhen[String]("sign_up", event.name, event.event.property("referral_key").asString),
optionWhen[Int]("sign_up", event.name, event.userId),
optionWhen[Long]("sign_up", event.name, event.header.rawTimestamp),
eventExists("purchase_page_view", event.name),
eventExists("purchase", event.name),
pageExists("Splash", event.name, event.event.property("page").asString),
pageExists("Introduction", event.name, event.event.property("page").asString),
pageExists("Signup", event.name, event.event.property("page").asString),
AdjustReferralExtract.extract(event.name, event.event.property("network").asString, event.event.property("campaign").asString)
)).toDS
projectedFields.printSchema()
projectedFields
.groupBy(GroupByKeys.head, GroupByKeys.tail: _*)
.agg(
first("clientId", ignoreNulls = true).alias("clientId"),
first("clientTimestamp", ignoreNulls = true).alias("clientTimestamp"),
first("userId", ignoreNulls = true).alias("userId"),
first("signupUserId", ignoreNulls = true).alias("signupUserId"),
first("signupReferral", ignoreNulls = true).alias("signupReferral"),
first("signupTimestamp", ignoreNulls = true).alias("signupTimestamp"),
first("adjustReferral", ignoreNulls = true).alias("adjustReferral"),
first("appName", ignoreNulls = true).alias("appName"),
first("clientPlatform", ignoreNulls = true).alias("clientPlatform"),
first("preferredLanguage", ignoreNulls = true).alias("preferredLanguage"),
sum("purchase").alias("purchases"),
sum("purchasePageView").alias("purchasePageViews"),
max("timestamp").alias("visitEndedAt"),
min("timestamp").alias("visitStartedAt")
)
.withColumn("visitType", visitType($"clientTimestamp", $"visitTimestamp"))
.withColumn("isUnregistered", isUnregistered($"userId", $"signupUserId"))
.take(20)

#group bys

for sure

  • visit_id / visit_timestamp
  • ll_source - web only, undefined for mobile -- also, derive visit_referral_key from it
  • ll_channel - web only, undefined for mobile
  • ip_address/country_code

maybe???

  • assignment_id

metrics

signup metrics

  • sign_up_user_id
  • signup referral key (if a signup happens, extract referral key from there)
  • client_id / client_timestamp
  • is facebook signup
  • signup timestamp

product metrics

  • purchases
  • plan_name
  • product_name
  • fx_rate
  • conversion_type
  • usd_purchase_amt

pages

  • purchase_page_view -- this is web only?
  • billing_page_view
  • signup_page_view

landing metrics

  • landing_page_id, landing_experience_id, landing_experience_set_id
  • ll_ad_id
  • creative_id

splits

  • app_name
  • client_platform
  • preferred_language
  • dma
  • subaffiliate_id

funnel marking splits

  • is_funnel_top - currently web only, undefined/open question for mobile
  • is_new_visit - currently web only, undefined/open question for mobile
  • is unregistered visitor - literally, visits starts off w/ no user_id

split test info

  • ste roll up -- maybe count by id, variant, mark by pre-reg, pre-conv
  • sta roll up?

other

  • events list?
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.TypedColumn
import org.apache.spark.sql.functions.udf
import java.time.Instant
import spark.implicits._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe._
// functions
def optionWhen[T](m: String, n: String, o: Option[T]): Option[T] = if(m == n) o else None
// additional functions to do extractions by predicate
def eventPropertyForEvent[T: TypeTag: ClassTag](eventName: String)(prop: String)(event: Event): Option[T] = {
if(event.name == eventName) event.event.propertyVal[T](prop) else None
}
def eventExists(eventName: String)(event: Event): Int = if(event.name == eventName) 1 else 0
def pageExists(page: String)(event: Event): Int = {
val extractedPage = event.event.property("page").asString
if(event.name == "page_view" && extractedPage.isDefined && extractedPage.get == page) 1 else 0
}
// case class
case class ProjectedFields(
name: String,
timestamp: Long,
visitId: String,
visitTimestamp: Long,
clientId: String,
clientTimestamp: Long,
userId: Option[Int],
assignmentId: String,
ipAddress: Option[String],
appName: Option[String],
clientPlatform: Option[String],
referralKey: Option[String],
preferredLanguage: Option[String],
signupReferral: Option[String],
signupUserId: Option[Int],
signupTimestamp: Option[Long],
purchasePageView: Int,
purchase: Int,
splashPageView: Int,
introductionPageView: Int,
signupPageView: Int,
adjustReferral: Option[String],
adjustNetwork: Option[String],
adjustCampaign: Option[String]
)
case class VisitFact(
visitId: String,
assignmentId: String,
ipAddress: Option[String],
visitTimestamp: Long,
clientId: String,
clientTimestamp: Long,
userId: Option[Long],
signupUserId: Option[Long],
signupReferral: Option[String],
signupTimestamp: Option[Long],
adjustReferral: Option[String],
adjustNetwork: Option[String],
adjustCampaign: Option[String],
appName: String,
clientPlatform: Option[String],
preferredLanguage: Option[String],
purchases: BigInt,
purchasePageViews: BigInt,
splashPageViews: BigInt,
introductionPageViews: BigInt,
signupPageViews: BigInt,
visitEndedAt: Long,
visitStartedAt: Long,
visitType: String,
isUnregistered: Int
)
// constants and vals
val IosAppNames = Set("Lumosity Mobile", "Lumosity Mobile iPad")
val BlackListEvents = Set("split_test_assigned", "bulk_email_unsubscribe")
val NewVisitThresholdInMillis = 150000
val GroupByKeys = List(
"visitId",
"assignmentId",
"ipAddress",
"visitTimestamp"
)
// spark job
//val events = sc.textFile("/mnt/lumos-data-dump-dev01/andy/test/events_small/")
val events = sc.textFile("/mnt/lumos-events/yyyy=2017/mm=03/dd=11")
val projectedFields = events.map(Event(_))
.filter(event => {
event.isValid && !BlackListEvents.contains(event.name) && IosAppNames.contains(event.appName) && event.visit.id.isDefined && event.visit.timestamp.isDefined && event.client.id.isDefined && event.client.timestamp.isDefined && event.assignmentId.isDefined
})
.map(event => ProjectedFields(
event.name,
event.header.rawTimestamp.get,
event.visit.id.get,
event.visit.rawTimestamp.get,
event.client.id.get,
event.client.rawTimestamp.get,
event.userId,
event.assignmentId.get,
event.ipAddress,
event.event.property("app_name").asString,
event.event.property("client_platform").asString,
event.event.property("referral_key").asString,
event.preferredLanguage,
eventPropertyForEvent[String]("sign_up")("referral_key")(event),
eventPropertyForEvent[Int]("sign_up")("user_id")(event),
optionWhen[Long]("sign_up", event.name, event.header.rawTimestamp),
eventExists("purchase_page_view")(event),
eventExists("purchase")(event),
pageExists("Splash")(event),
pageExists("Introduction")(event),
pageExists("Signup")(event),
eventPropertyForEvent[String]("adjust_attribution_received")("referral_key")(event),
eventPropertyForEvent[String]("adjust_attribution_received")("network")(event),
eventPropertyForEvent[String]("adjust_attribution_received")("campaign")(event)
)).toDS
val aggregatedVisitFacts = projectedFields
.groupBy(GroupByKeys.head, GroupByKeys.tail: _*)
.agg(
first("clientId", ignoreNulls = true).alias("clientId"),
first("clientTimestamp", ignoreNulls = true).alias("clientTimestamp"),
first("userId", ignoreNulls = true).alias("userId"),
first("signupUserId", ignoreNulls = true).alias("signupUserId"),
first("signupReferral", ignoreNulls = true).alias("signupReferral"),
first("signupTimestamp", ignoreNulls = true).alias("signupTimestamp"),
first("adjustReferral", ignoreNulls = true).alias("adjustReferral"),
first("adjustNetwork", ignoreNulls = true).alias("adjustNetwork"),
first("adjustCampaign", ignoreNulls = true).alias("adjustCampaign"),
first("appName", ignoreNulls = true).alias("appName"),
first("clientPlatform", ignoreNulls = true).alias("clientPlatform"),
first("preferredLanguage", ignoreNulls = true).alias("preferredLanguage"),
sum("purchase").alias("purchases"),
sum("purchasePageView").alias("purchasePageViews"),
sum("splashPageView").alias("splashPageViews"),
sum("introductionPageView").alias("introductionPageViews"),
sum("signupPageView").alias("signupPageViews"),
max("timestamp").alias("visitEndedAt"),
min("timestamp").alias("visitStartedAt")
)
.withColumn("visitType", when($"visitTimestamp" - $"clientTimestamp" < NewVisitThresholdInMillis, "new").otherwise("returning"))
.withColumn("isUnregistered", when($"userId".isNull || $"signupUserId".isNotNull, 1).otherwise(0)
)
.as[VisitFact].cache()
//aggregatedVisitFacts.take(10)
aggregatedVisitFacts.groupBy($"appName", $"clientPlatform").count().collect()
@jz042
Copy link

jz042 commented Mar 13, 2017

Some general comments:

  • The split info section might be too much (how would this rollup work? since a user might get assigned upwards of 10 split tests and experience 5). If these end up in different rows, then it makes the table a bit more difficult to use. Also I'm not sure how much value it would add since for most split tests the things they look for are quite specific.
  • Would the events list include all events the user experience during the entire visit? Would this just be a JSON field?
  • Product metrics: Instead of all these columns can we just include subscription_id? What does purchases mean? Would a user make multiple purchases during a visit?

I went through the entire list here to mark the columns as keep or not keep, but open to suggestions (also getting the acquisitions team to review):
https://docs.google.com/spreadsheets/d/1Bdudm6sCA-0N7Dg1GD1cv2lgNjam_BtIqrWkfOFC1FU/edit#gid=0

@dollschasingmen
Copy link
Author

dollschasingmen commented Mar 16, 2017

Would the events list include all events the user experience during the entire visit? Would this just be a JSON field?

forget i suggested this, this is dumb idea

Product metrics: Instead of all these columns can we just include subscription_id? What does purchases mean? Would a user make multiple purchases during a visit?

alas -- no :( unless people start firing subscription_id in the purchase event. visit_facts gets all the product info from within the purchase event. i think a visitor could make multiple purchases, right? get a sub for themselves and maybe for someone else? although not often.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment