Skip to content

Instantly share code, notes, and snippets.

@dgadiraju
Last active March 16, 2019 04:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgadiraju/2a9cde067c510c5b7df86df6cc402bdb to your computer and use it in GitHub Desktop.
Save dgadiraju/2a9cde067c510c5b7df86df6cc402bdb to your computer and use it in GitHub Desktop.
CREATE TABLE orders (
order_id INT,
order_date STRING,
order_customer_id INT,
order_status STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://itversitydata/retail_db/orders';
CREATE TABLE order_items (
order_item_id INT,
order_item_order_id INT,
order_item_product_id INT,
order_item_quantity INT,
order_item_subtotal FLOAT,
order_item_product_price FLOAT
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://itversitydata/retail_db/order_items';
CREATE EXTERNAL TABLE daily_revenue
LOCATION 's3://itversitydata/retail_db/daily_revenue'
AS
SELECT o.order_date, round(sum(oi.order_item_subtotal), 2) AS revenue
FROM orders o JOIN order_items oi
ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE', 'CLOSED')
GROUP BY o.order_date
ORDER BY o.order_date;
val orders = spark.
read.
format("csv").
schema(s"""order_id INT, order_date STRING,
order_customer_id INT, order_status STRING
""").
load("s3://itversitydata/retail_db/orders")
val orderItems = spark.
read.
format("csv").order_item_order_id
schema(s"""order_item_id INT, order_item_order_id INT,
order_item_product_id INT, order_item_quantity INT,
order_item_subtotal FLOAT, order_item_product_price FLOAT
""").
load("s3://itversitydata/retail_db/order_items")
spark.conf.set("spark.sql.shuffle.partitions", "2")
val dailyRevenue = orders.
filter("order_status in ('COMPLETE', 'CLOSED')").
join(orderItems, $"order_id" === $"order_item_order_id").
groupBy("order_date").
agg(round(sum($"order_item_subtotal"), 2).alias("revenue")).
sort("order_date")
dailyRevenue.write.csv("s3://itversitydata/retail_db/daily_revenue")
orders = spark. \
read. \
format("csv"). \
schema("""order_id INT, order_date STRING,
order_customer_id INT, order_status STRING
"""). \
load("s3://itversitydata/retail_db/orders")
orderItems = spark. \
read. \
format("csv"). \
schema("""order_item_id INT, order_item_order_id INT,
order_item_product_id INT, order_item_quantity INT,
order_item_subtotal FLOAT, order_item_product_price FLOAT
"""). \
load("s3://itversitydata/retail_db/order_items")
spark.conf.set("spark.sql.shuffle.partitions", "2")
from pyspark.sql.functions import sum, round
dailyRevenue = orders. \
filter("order_status in ('COMPLETE', 'CLOSED')"). \
join(orderItems, orders.order_id == orderItems.order_item_order_id). \
groupBy("order_date"). \
agg(round(sum("order_item_subtotal"), 2).alias("revenue")). \
orderBy(orders.order_date)
dailyRevenue.write.csv("s3://itversitydata/retail_db/daily_revenue")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment