Skip to content

Instantly share code, notes, and snippets.

@jarutis
Created March 12, 2015 06:02
Show Gist options
  • Save jarutis/59b3d0c1347c9279d16f to your computer and use it in GitHub Desktop.
Save jarutis/59b3d0c1347c9279d16f to your computer and use it in GitHub Desktop.
DataFrames api
./bin/spark-shell --master yarn-client --num-executors 50 --driver-memory 7g --executor-memory 7g --executor-cores 1 --jars /home/jjarutis/ini/joda-time-2.7/joda-time-2.7.jar,/home/jjarutis/ini/joda-time-2.7/joda-convert-1.7.jar,/home/jjarutis/math_2.10-0.1.0.jar
import org.joda.time.{LocalDate, Days}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
import vinted.math.{RolingSum, TotalSum}
// Helpers
val context = new org.apache.spark.sql.SQLContext(sc)
def readMysqlTable(name: String, portal:String) = {
val path = s"/user/hive/warehouse/mysql_imports.db/${portal}__$name"
context.parquetFile(path)
}
def readTables(name: String)(implicit portals: List[String]) = {
portals.map { portal => readMysqlTable(name, portal) }.reduce(_ unionAll _)
}
implicit val portals = List("uk","fr","de")
/////////////////////////////////////////////////////////////////////////
// Job
val transactions = readTables("transactions")
def aggregateTransactions(df: DataFrame, target: String, items: String, gmv: String) = {
df.groupBy("portal_id", target, "sale_date")
.agg($"portal_id",
col(target) as "user_id",
$"sale_date" as "session_date",
sum($"gmv") as gmv,
count($"gmv") as items)
}
val boughtItems = aggregateTransactions(transactions, "buyer_id", "items_bought", "gmv_bought")
val soldItems = aggregateTransactions(transactions, "seller_id", "items_sold", "gmv_sold")
val sessions = readTables("user_ips")(List("fr")).
withColumn("session_date", ($"session_start").substr(0, 10)).
select("portal_id", "user_id", "session_date")
val activeDates = sessions.
unionAll(boughtItems.select("portal_id", "user_id", "session_date")).
unionAll(soldItems.select("portal_id", "user_id", "session_date")).
distinct
val activity = activeDates.as("a").
join(boughtItems.as("b"),
$"a.portal_id" === $"b.portal_id" &&
$"a.session_date" === $"b.session_date" &&
$"a.user_id" === $"b.user_id", "left_outer").
join(soldItems.as("c"),
$"a.portal_id" === $"c.portal_id" &&
$"a.session_date" === $"c.session_date" &&
$"a.user_id" === $"c.user_id", "left_outer").
select($"a.portal_id",
$"a.session_date",
$"a.user_id",
coalesce($"items_bought", lit(0)) as "items_bought",
coalesce($"gmv_bought", lit(0.0)) as "gmv_bought",
coalesce($"items_sold", lit(0)) as "items_sold",
coalesce($"gmv_sold", lit(0.0)) as "gmv_sold").
where(($"user_id").isNotNull)
activity.saveAsParquetFile("/user/jjarutis/activity")
/////////////////////////////////////////////////////////////////////////
// roling sum calculation
val WINDOW = 90
def daysSinceEpoch(date: String) = new LocalDate(date).toDate.getTime/1000/60/60/24
case class RolingStats(
portal_id: String,
date: String,
user_id: Int,
roling_items_bought: Double,
roling_gmv_bought: Double,
roling_items_sold: Double,
roling_gmv_sold: Double,
total_items_bought: Double,
total_gmv_bought: Double,
total_items_sold: Double,
total_gmv_sold: Double
)
val activity = context.parquetFile("/user/jjarutis/activity")
val rolingSums = activity.
where($"session_date" !== "NULL").
rdd.
map { r => (r.getString(0), r.getString(1), r.getInt(2),
r.getLong(3), r.getDouble(4), r.getLong(5), r.getDouble(6)) }.
groupBy { case (portal, _, user, _, _, _, _) => (portal, user) }.
map { case (_, values) =>
{ val data = values.toList.sortBy { case (_, date, _, _, _, _, _) => date }
val rollItemsBought = new RolingSum(WINDOW)
val rollGmvBought = new RolingSum(WINDOW)
val rollItemsSold = new RolingSum(WINDOW)
val rollGmvSold = new RolingSum(WINDOW)
val totalItemsBought = new TotalSum
val totalGmvBought = new TotalSum
val totalItemsSold = new TotalSum
val totalGmvSold = new TotalSum
data.map { case (portal, date, user, itemsBought, gmvBought, itemsSold, gmvSold) =>
{ val days = daysSinceEpoch(date)
RolingStats(portal,
date,
user,
rollItemsBought(itemsBought, days),
rollGmvBought(gmvBought, days),
rollItemsSold(itemsSold, days),
rollGmvSold(gmvSold, days),
totalItemsBought(itemsBought),
totalGmvBought(gmvBought),
totalItemsSold(itemsSold),
totalGmvSold(gmvSold))
} }
} }.flatMap(identity)
rolingSums.toDF.saveAsParquetFile("/user/jjarutis/activity_roll")
/////////////////////////////////////////////////////////////////////////
// Activity dimensions
val boughtItemsSegment = udf( (windowed: Double, total: Double) =>
(windowed, total) match {
case _ if total < 0.001 => "New shopper"
case _ if windowed <= 2 => "Occasional shopper"
case _ if windowed > 2 && windowed <= 5 => "Regular shopper"
case _ if windowed > 5 => "Addicted shopper"
})
val soldItemsSegment = udf( (windowed: Double, total: Double) =>
(windowed, total) match {
case _ if total < 0.001 => "New seller"
case _ if windowed <= 2 => "Occasional seller"
case _ if windowed > 2 && windowed <= 8 => "Regular seller"
case _ if windowed > 8 => "Heavy seller"
})
val rolingSums = context.parquetFile("/user/jjarutis/activity_roll")
rolingSums.
select($"portal_id", $"date", $"user_id", $"roling_items_bought", $"roling_items_sold",
boughtItemsSegment($"roling_items_bought", $"total_items_bought") as "buyer_activity",
soldItemsSegment($"roling_items_sold", $"total_items_sold") as "seller_activity").
saveAsParquetFile("/user/jjarutis/activity_segments")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment