Last active
August 29, 2015 14:24
-
-
Save jhowliu/40c6f0bd0a10fef25e3a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
s |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE EXTERNAL TABLE intermediate_access_logs ( | |
ip STRING, | |
day STRING, | |
method STRING, | |
url STRING, | |
http_version STRING, | |
code1 STRING, | |
code2 STRING, | |
dash STRING, | |
user_agent STRING) | |
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe' | |
WITH SERDEPROPERTIES ( | |
"input.regex" = "([^ ]*) - - \\[([^\\]]*)\\] \"([^\ ]*) ([^\ ]*) ([^\ ]*)\" (\\d*) (\\d*) \"([^\"]*)\" \"([^\"]*)\"", | |
"output.format.string" = "%1$s %2$s %3$s %4$s %5$s %6$s %7$s %8$s %9$s" | |
) | |
LOCATION '/user/eva/warehouse/original_access_logs'; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.avro.generic.GenericRecord | |
import org.apache.avro.mapred.{AvroInputFormat, AvroWrapper} | |
import org.apache.hadoop.io.NullWritable | |
val warehouse = "hdfs://localhost:8020/user/eva/warehouse/" | |
val order_items_path = warehouse + "order_items" | |
val order_items = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](order_items_path) | |
val products_path = warehouse + "products" | |
val products = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](products_path) | |
// Next, we extract the fields from order_items and products that we care about | |
// and get a list of every product, its name and quantity, grouped by order | |
val orders = order_items.map { x => ( | |
x._1.datum.get("order_item_product_id"), | |
(x._1.datum.get("order_item_order_id"), x._1.datum.get("order_item_quantity"))) | |
}.join( | |
products.map { x => ( | |
x._1.datum.get("product_id"), | |
(x._1.datum.get("product_name"))) | |
} | |
).map(x => ( | |
scala.Int.unbox(x._2._1._1), // order_id | |
( | |
scala.Int.unbox(x._2._1._2), // quantity | |
x._2._2.toString // product_name | |
) | |
)).groupByKey() | |
// Finally, we tally how many times each combination of products appears | |
// together in an order, and print the 10 most common combinations. | |
val cooccurrences = orders.map(order => | |
( | |
order._1, | |
order._2.toList.combinations(2).map(order_pair => | |
( | |
if (order_pair(0)._2 < order_pair(1)._2) (order_pair(0)._2, order_pair(1)._2) else (order_pair(1)._2, order_pair(0)._2), | |
order_pair(0)._1 * order_pair(1)._1 | |
) | |
) | |
) | |
) | |
val combos = cooccurrences.flatMap(x => x._2).reduceByKey((a, b) => a + b) | |
val mostCommon = combos.map(x => (x._2, x._1)).sortByKey(false).take(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment