Created
March 12, 2015 06:02
-
-
Save jarutis/59b3d0c1347c9279d16f to your computer and use it in GitHub Desktop.
DataFrames api
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
./bin/spark-shell --master yarn-client --num-executors 50 --driver-memory 7g --executor-memory 7g --executor-cores 1 --jars /home/jjarutis/ini/joda-time-2.7/joda-time-2.7.jar,/home/jjarutis/ini/joda-time-2.7/joda-convert-1.7.jar,/home/jjarutis/math_2.10-0.1.0.jar | |
import org.joda.time.{LocalDate, Days} | |
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.Row | |
import org.apache.spark.rdd.RDD | |
import vinted.math.{RolingSum, TotalSum} | |
// Helpers | |
val context = new org.apache.spark.sql.SQLContext(sc) | |
def readMysqlTable(name: String, portal:String) = { | |
val path = s"/user/hive/warehouse/mysql_imports.db/${portal}__$name" | |
context.parquetFile(path) | |
} | |
def readTables(name: String)(implicit portals: List[String]) = { | |
portals.map { portal => readMysqlTable(name, portal) }.reduce(_ unionAll _) | |
} | |
implicit val portals = List("uk","fr","de") | |
///////////////////////////////////////////////////////////////////////// | |
// Job | |
val transactions = readTables("transactions") | |
def aggregateTransactions(df: DataFrame, target: String, items: String, gmv: String) = { | |
df.groupBy("portal_id", target, "sale_date") | |
.agg($"portal_id", | |
col(target) as "user_id", | |
$"sale_date" as "session_date", | |
sum($"gmv") as gmv, | |
count($"gmv") as items) | |
} | |
val boughtItems = aggregateTransactions(transactions, "buyer_id", "items_bought", "gmv_bought") | |
val soldItems = aggregateTransactions(transactions, "seller_id", "items_sold", "gmv_sold") | |
val sessions = readTables("user_ips")(List("fr")). | |
withColumn("session_date", ($"session_start").substr(0, 10)). | |
select("portal_id", "user_id", "session_date") | |
val activeDates = sessions. | |
unionAll(boughtItems.select("portal_id", "user_id", "session_date")). | |
unionAll(soldItems.select("portal_id", "user_id", "session_date")). | |
distinct | |
val activity = activeDates.as("a"). | |
join(boughtItems.as("b"), | |
$"a.portal_id" === $"b.portal_id" && | |
$"a.session_date" === $"b.session_date" && | |
$"a.user_id" === $"b.user_id", "left_outer"). | |
join(soldItems.as("c"), | |
$"a.portal_id" === $"c.portal_id" && | |
$"a.session_date" === $"c.session_date" && | |
$"a.user_id" === $"c.user_id", "left_outer"). | |
select($"a.portal_id", | |
$"a.session_date", | |
$"a.user_id", | |
coalesce($"items_bought", lit(0)) as "items_bought", | |
coalesce($"gmv_bought", lit(0.0)) as "gmv_bought", | |
coalesce($"items_sold", lit(0)) as "items_sold", | |
coalesce($"gmv_sold", lit(0.0)) as "gmv_sold"). | |
where(($"user_id").isNotNull) | |
activity.saveAsParquetFile("/user/jjarutis/activity") | |
///////////////////////////////////////////////////////////////////////// | |
// roling sum calculation | |
val WINDOW = 90 | |
def daysSinceEpoch(date: String) = new LocalDate(date).toDate.getTime/1000/60/60/24 | |
case class RolingStats( | |
portal_id: String, | |
date: String, | |
user_id: Int, | |
roling_items_bought: Double, | |
roling_gmv_bought: Double, | |
roling_items_sold: Double, | |
roling_gmv_sold: Double, | |
total_items_bought: Double, | |
total_gmv_bought: Double, | |
total_items_sold: Double, | |
total_gmv_sold: Double | |
) | |
val activity = context.parquetFile("/user/jjarutis/activity") | |
val rolingSums = activity. | |
where($"session_date" !== "NULL"). | |
rdd. | |
map { r => (r.getString(0), r.getString(1), r.getInt(2), | |
r.getLong(3), r.getDouble(4), r.getLong(5), r.getDouble(6)) }. | |
groupBy { case (portal, _, user, _, _, _, _) => (portal, user) }. | |
map { case (_, values) => | |
{ val data = values.toList.sortBy { case (_, date, _, _, _, _, _) => date } | |
val rollItemsBought = new RolingSum(WINDOW) | |
val rollGmvBought = new RolingSum(WINDOW) | |
val rollItemsSold = new RolingSum(WINDOW) | |
val rollGmvSold = new RolingSum(WINDOW) | |
val totalItemsBought = new TotalSum | |
val totalGmvBought = new TotalSum | |
val totalItemsSold = new TotalSum | |
val totalGmvSold = new TotalSum | |
data.map { case (portal, date, user, itemsBought, gmvBought, itemsSold, gmvSold) => | |
{ val days = daysSinceEpoch(date) | |
RolingStats(portal, | |
date, | |
user, | |
rollItemsBought(itemsBought, days), | |
rollGmvBought(gmvBought, days), | |
rollItemsSold(itemsSold, days), | |
rollGmvSold(gmvSold, days), | |
totalItemsBought(itemsBought), | |
totalGmvBought(gmvBought), | |
totalItemsSold(itemsSold), | |
totalGmvSold(gmvSold)) | |
} } | |
} }.flatMap(identity) | |
rolingSums.toDF.saveAsParquetFile("/user/jjarutis/activity_roll") | |
///////////////////////////////////////////////////////////////////////// | |
// Activity dimensions | |
val boughtItemsSegment = udf( (windowed: Double, total: Double) => | |
(windowed, total) match { | |
case _ if total < 0.001 => "New shopper" | |
case _ if windowed <= 2 => "Occasional shopper" | |
case _ if windowed > 2 && windowed <= 5 => "Regular shopper" | |
case _ if windowed > 5 => "Addicted shopper" | |
}) | |
val soldItemsSegment = udf( (windowed: Double, total: Double) => | |
(windowed, total) match { | |
case _ if total < 0.001 => "New seller" | |
case _ if windowed <= 2 => "Occasional seller" | |
case _ if windowed > 2 && windowed <= 8 => "Regular seller" | |
case _ if windowed > 8 => "Heavy seller" | |
}) | |
val rolingSums = context.parquetFile("/user/jjarutis/activity_roll") | |
rolingSums. | |
select($"portal_id", $"date", $"user_id", $"roling_items_bought", $"roling_items_sold", | |
boughtItemsSegment($"roling_items_bought", $"total_items_bought") as "buyer_activity", | |
soldItemsSegment($"roling_items_sold", $"total_items_sold") as "seller_activity"). | |
saveAsParquetFile("/user/jjarutis/activity_segments") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment