Skip to content

Instantly share code, notes, and snippets.

@marcovivero
Last active January 13, 2016 07:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save marcovivero/97137abc028a2e73e61c to your computer and use it in GitHub Desktop.
Save marcovivero/97137abc028a2e73e61c to your computer and use it in GitHub Desktop.
package conversions
import scala.math.max
import scala.math.min
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.UserDefinedFunction
import org.joda.time.DateTime
class Users(
sc: SparkContext)
extends Serializable {
val sqlContext = new SQLContext(sc)
import Common.timeFormatter
import sqlContext.implicits._
import Users._
// Get ad data.
val adData: DataFrame = sc
.textFile("<FILE_PATH>/users_ads.csv")
.map(line => {
// Extract CSV fields.
val values: Array[String] = line.split(",")
// Extract user id.
val id: String = values(0)
// Extract traffic source.
val trafficSource: String = values(1)
// Extract ad campaign.
val adCampaign: String = values(2)
// Extract ad medium.
val adMedium: String = values(3)
// Extract search keyword.
val searchKeyword: String = values(4)
// Extract ad content.
val adContent: String = values(5)
AdInfo(id, trafficSource, adCampaign, adMedium, searchKeyword, adContent)
})
.toDF // Convert to Spark DataFrame.
.dropDuplicates(Seq("id")) // Deduplicate data.
// Get purchase data.
val purchaseData: DataFrame = sc
.textFile("<FILE_PATH>/conversions.csv")
.map(line => {
// Extract CSV fields.
val values: Array[String] = line.split(",")
// Extract user id.
val userId: String = values(0)
// Extract price at which item was sold.
val price: Double = values(2).toDouble
// Extract item quantity sold.
val quantity: Double = values(3).toDouble
// Extract revenue from purchase event.
val revenue: Double = price * quantity
// Extract purchase timestamp.
val purchaseTimestamp: String = values(4)
// Extract purchase time in seconds.
val purchaseTime: Double = timeFormatter
.parseDateTime(purchaseTimestamp)
.getMillis
.toDouble / 1000
PurchaseEvent(userId, revenue, purchaseTime)
})
.toDF // Convert to Spark DataFrame.
// Get signup data.
val signupData: DataFrame = sc
.textFile("<FILE_PATH>/users.csv")
.map(line => {
// Extract CSV fields.
val values: Array[String] = line.split(",")
val valuesSize: Int = values.size
// Extract user id/
val id: String = values(0)
// Some register country fields contain commas,and thus we must collapse
// inner values components in order to extract register country.
val registerCountry: String = values
.slice(1, valuesSize - 1)
.mkString("-")
// Extract timestamp.
val signupTimestamp: String = values(valuesSize - 1)
// Parse DateTime from timestamp.
val signupDateTime: DateTime = timeFormatter
.parseDateTime(signupTimestamp)
// Extract signup time in seconds.
val signupTime: Double = signupDateTime
.getMillis
.toDouble / 1000
// Extract day of year, week of year, and month of year for future feature generation.
val dayOfYear: String = signupDateTime
.getDayOfYear
.toString
val weekOfYear: String = signupDateTime
.getWeekOfWeekyear
.toString
val monthOfYear: String = signupDateTime
.getMonthOfYear
.toString
SignupEvent(id, registerCountry, signupTime, dayOfYear, weekOfYear, monthOfYear)
})
.toDF // Convert to Spark DataFrame.
// Get view data.
val viewData: DataFrame = sc
.textFile("<FILE_PATH>/views.csv")
.map(line => {
// Extract CSV fields.
val values = line.split(",")
// Extract user id.
val userId: String = values(0)
// Extract view event timestamp.
val viewTimestamp: String = values(2)
// Extract view event time in seconds.
val viewTime: Double = timeFormatter
.parseDateTime(viewTimestamp)
.getMillis
.toDouble / 1000
ViewEvent(userId,viewTime)
})
.toDF // Convert to Spark DataFrame
// Extract earliest and latest recorded purchase times.
private val purchaseTimes: MinMaxTime = purchaseData
.withColumn("purchaseTimeCopy", $"purchaseTime")
.agg(Map("purchaseTime" -> "min", "purchaseTimeCopy" -> "max"))
.map(row => {
val minTime = row.getAs[Double]("min(purchaseTime)")
val maxTime = row.getAs[Double]("max(purchaseTimeCopy)")
MinMaxTime(minTime, maxTime)
})
.collect
.apply(0)
// Extract earliest and latest recorded view times.
private val viewTimes: MinMaxTime = viewData
.withColumn("viewTimeCopy", $"viewTime")
.agg(Map("viewTime" -> "min", "viewTimeCopy" -> "max"))
.map(row => {
val minTime = row.getAs[Double]("min(viewTime)")
val maxTime = row.getAs[Double]("max(viewTimeCopy)")
MinMaxTime(minTime, maxTime)
})
.collect
.apply(0)
// Extract minimum activity time.
val minActivityTime: Double = min(purchaseTimes.minTime, viewTimes.minTime)
val maxActivityTime: Double = max(purchaseTimes.maxTime, viewTimes.maxTime)
// Compute constant denoting the length of 365 days in seconds.
private val yearInSeconds: Double = 365 * 24 * 60 * 60
// Generate activity time filter for removing users according to processing rules.
val activityFilterUDF: (Double, Double, Double) => UserDefinedFunction = {
(minActivityTime: Double, maxActivityTime: Double, yearInSeconds: Double) => functions.udf({
(signupTime: Double) => {
signupTime >= minActivityTime && maxActivityTime - signupTime >= yearInSeconds
}
})
}
// Generate user defined column function for returning nonzero revenues within CV time limits.
val cvUDF: Double => UserDefinedFunction = (yearInSeconds: Double) => functions.udf({
(revenue: Double, purchaseTime: Double, signupTime: Double) => {
val timeDiff: Double= purchaseTime - signupTime
if (timeDiff >= 0 && timeDiff <= yearInSeconds) {
revenue
} else {
0.0
}
}
})
// Get pre final data frame that satisifies activity and revenue time requirements.
val validUsers: DataFrame = signupData
// Only keep users that satisfy activity requirements.
.filter(activityFilterUDF(minActivityTime, maxActivityTime, yearInSeconds)($"signupTime"))
val preCVData: DataFrame = validUsers
.join(purchaseData, validUsers("id") === purchaseData("userId"), "left_outer")
.na
.fill(Map("revenue" -> 0.0, "purchaseTime" -> 0.0))
.drop("userId")
// Restrict revenue withing 12 month time limit.
.withColumn("customerValue", cvUDF(yearInSeconds)($"revenue", $"purchaseTime", $"signupTime"))
val cvData: DataFrame = preCVData
.groupBy("id")
.agg(
functions.first($"registerCountry").as("registerCountry"),
functions.first($"dayOfYear").as("dayOfYear"),
functions.first($"weekOfYear").as("weekOfYear"),
functions.first($"monthOfYear").as("monthOfYear"),
functions.sum($"customerValue").as("customerValue"))
.join(adData, "id")
.drop("id")
}
object Users {
def main(args: Array[String]): Unit = {
import scala.sys.process._
printWrapper("Deleting old data.....")
"sudo rm -rf <OUTPUT_FILE_PATH>/users.parquet/".!!
printWrapper("............")
"sudo rm -rf <OUTPUT_FILE_PATH>/customerValue.csv/".!!
printWrapper("Done.")
val sparkConf = Common.getSparkConf("Computing_CV")
val sc: SparkContext = new SparkContext(sparkConf)
val users: Users = new Users(sc)
val cvData = users
.cvData
.cache
printWrapper("Writing new data.....")
cvData
.write
.parquet("<OUTPUT_FILE_PATH>/users.parquet")
printWrapper("............")
cvData
.map(_.getAs[Double]("customerValue"))
.saveAsTextFile("<OUTPUT_FILE_PATH>/customerValue.csv")
printWrapper("New data written.")
}
def printWrapper(any: Any): Unit = {
println()
println(any)
println()
}
case class AdInfo(
id: String,
trafficSource: String,
adCampaign: String,
adMedium: String,
searchKeyword: String,
adContent: String)
extends Serializable
case class MinMaxTime(
minTime: Double,
maxTime: Double)
extends Serializable
case class PurchaseEvent(
userId: String,
revenue: Double,
purchaseTime: Double)
extends Serializable
case class SignupEvent(
id: String,
registerCountry: String,
signupTime: Double,
dayOfYear: String,
weekOfYear: String,
monthOfYear: String)
extends Serializable
case class ViewEvent(
userId: String,
viewTime: Double)
extends Serializable
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment