Skip to content

Instantly share code, notes, and snippets.

@dgadiraju
Last active March 17, 2019 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgadiraju/322adf77162acf436b7600bd5878499e to your computer and use it in GitHub Desktop.
Save dgadiraju/322adf77162acf436b7600bd5878499e to your computer and use it in GitHub Desktop.
[dev]
executionMode = local
input.base.dir = /Users/itversity/Research/data/retail_db
output.base.dir = /Users/itversity/Research/data/bootcamp/pyspark
[prod]
executionMode = yarn-client
input.base.dir = /public/retail_db
output.base.dir = /user/training/bootcamp/pyspark
import configparser as cp, sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, round, dense_rank
from pyspark.sql.window import *
props = cp.RawConfigParser()
props.read('src/main/resources/application.properties')
env = sys.argv[1]
topN = int(sys.argv[2])
spark = SparkSession. \
builder. \
master(props.get(env, 'executionMode')). \
appName('Get TopN Daily Products using Data Frame Operations'). \
getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions', '2')
spark.sparkContext.setLogLevel('ERROR')
inputBaseDir = props.get(env, 'input.base.dir')
orders = spark.read. \
format('csv'). \
schema('order_id int, order_date string, order_customer_id int, order_status string'). \
load(inputBaseDir + '/orders')
orderItems = spark.read. \
format('csv'). \
schema('''order_item_id int,
order_item_order_id int,
order_item_product_id int,
order_item_quantity int,
order_item_subtotal float,
order_item_product_price float
'''). \
load(inputBaseDir + '/order_items')
dailyProductRevenue = orders. \
where('order_status in ("COMPLETE", "CLOSED")'). \
join(orderItems, orders.order_id == orderItems.order_item_order_id). \
groupBy('order_date', 'order_item_product_id'). \
agg(round(sum('order_item_subtotal'), 2).alias('revenue'))
spec = Window. \
partitionBy('order_date'). \
orderBy(dailyProductRevenue.revenue.desc())
dailyProductRevenueRanked = dailyProductRevenue. \
withColumn("rnk", dense_rank().over(spec))
topNDailyProducts = dailyProductRevenueRanked. \
where(dailyProductRevenueRanked.rnk <= topN). \
drop('rnk'). \
orderBy('order_date', dailyProductRevenueRanked.revenue.desc())
outputBaseDir = props.get(env, 'output.base.dir')
topNDailyProducts. \
write. \
csv(outputBaseDir + '/topn_daily_products')
spark-submit \
--master yarn \
--deploy-mode client \
--conf spark.ui.port=12901 \
src/main/python/retail_db/df/TopNDailyProductsDFO.py \
prod
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment