Skip to content

Instantly share code, notes, and snippets.

val orderCustomers1 = sc.textFile("/public/retail_db/orders").
filter(o => o.split(",")(1).contains("2013-08")).
map(o => o.split(",")(2).toInt)
val orderCustomers2 = sc.textFile("/public/retail_db/orders").
filter(o => o.split(",")(1).contains("2013-09")).
map(o => o.split(",")(2).toInt)
val customers = sc.textFile("/public/retail_db/customers").
map(c => (c.split(",")(0).toInt, c))
val orderCustomers1 = sc.textFile("/public/retail_db/orders").
filter(o => o.split(",")(1).contains("2013-08")).
map(o => o.split(",")(2).toInt)
val orderCustomers2 = sc.textFile("/public/retail_db/orders").
filter(o => o.split(",")(1).contains("2013-09")).
map(o => o.split(",")(2).toInt)
orderCustomers1.count
orderCustomers2.count
val orderCustomers1 = sc.textFile("/public/retail_db/orders").
filter(o => o.split(",")(1).contains("2013-08")).
map(o => o.split(",")(2))
val orderCustomers2 = sc.textFile("/public/retail_db/orders").
filter(o => o.split(",")(1).contains("2013-09")).
map(o => o.split(",")(2))
orderCustomers1.count
orderCustomers2.count
val orders = sc.textFile("/public/retail_db/orders")
val ordersMap = orders.
map(o => (o.split(",")(0).toInt, o.split(",")(3)))
val orderItems = sc.textFile("/public/retail_db/order_items")
val orderItemsMap = orderItems.
map(oi => (oi.split(",")(1).toInt, oi))
val ordersLeftOuterJoin = ordersMap.
leftOuterJoin(orderItemsMap)
val orders = sc.textFile("/public/retail_db/orders")
val ordersMap = orders.
map(o => (o.split(",")(0).toInt, o))
val orderItems = sc.textFile("/public/retail_db/order_items")
val orderItemsMap = orderItems.
map(oi => (oi.split(",")(1).toInt, oi))
val ordersLeftOuterJoin = ordersMap.
leftOuterJoin(orderItemsMap)
val orders = sc.textFile("/public/retail_db/orders")
val ordersMap = orders.
map(o => (o.split(",")(0).toInt, o))
val orderItems = sc.textFile("/public/retail_db/order_items")
val orderItemsMap = orderItems.
map(oi => (oi.split(",")(1).toInt, oi))
val ordersJoin = ordersMap.
join(orderItemsMap)
val orders = sc.textFile("/public/retail_db/orders")
val ordersMap = orders.
map(o => (o.split(",")(0).toInt, o))
val products = sc.textFile("/public/retail_db/products")
val productsMap = products.
filter(p => p.split(",")(4) != "").
map(p => (p.split(",")(1).toInt, p))
productsMap.
groupByKey.
flatMap(p => {
p._2.toList.sortBy(k => k.split(",")(4).toFloat)(Ordering.Float.reverse).take(3)
val products = sc.textFile("/public/retail_db/products")
val productsMap = products.
filter(p => p.split(",")(4) != "").
map(p => (p.split(",")(1).toInt, p))
productsMap.
groupByKey.
flatMap(p => {
p._2.toList.sortBy(k => k.split(",")(4).toFloat)(Ordering.Float.reverse)
val orderItems = sc.textFile("/public/retail_db/order_items")
val orderItemsMap = orderItems.
map(oi => (oi.split(",")(1).toInt, oi.split(",")(4).toFloat))
val revenueAndCountPerOrder = orderItemsMap.
aggregateByKey((0.0, 0))(
(agg, ele) => (agg._1 + ele, agg._2 + 1),
(total, inter) => (total._1 + inter._1, total._2 + inter._2)
)