Skip to content

Instantly share code, notes, and snippets.

import configparser as cp, sys
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
props = cp.RawConfigParser()
props.read('src/resources/application.properties')
env = sys.argv[1]
spark = SparkSession.builder.\
master(props.get(env, 'executionMode')).\
[dev]
executionMode = local[*]
input.base.dir = /Users/mug/retail_db
output.base.dir = /Users/mug/retail_db/pyspark
[prod]
executionMode = yarn-client
input.base.dir = /user/hadoop/retail_db
output.base.dir = /user/hadoop/pyspark
sorted_department_daily_revenue.write.format("parquet")\
.partitionBy("order_date")\
.option("path", "/user/hive/wharehouse/sorted_department_daily_revenue")\
.saveAsTable('sorted_department_daily_revenue')
#hive> SELECT * FROM sorted_department_daily_revenue;
# 2013-08-01 25047.03 Fan Shop
# 2013-08-01 11148.45 Apparel
from pyspark.sql import SparkSession
import os
SUBMIT_ARGS = "--packages org.apache.spark:spark-avro_2.12:3.0.0 pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
spark = SparkSession.builder\
.master('local')\
.appName('Jupyter')\
.getOrCreate()
top5_customers.write.json('json_top5_customers')
# {"customer_id":9337,"customer_spending":6585.33,"customer_fname":"Mary","customer_lname":"Smith"}
# {"customer_id":3710,"customer_spending":6169.4,"customer_fname":"Ashley","customer_lname":"Smith"}
# {"customer_id":2723,"customer_spending":6159.25,"customer_fname":"Mary","customer_lname":"Brady"}
# {"customer_id":10351,"customer_spending":5989.5,"customer_fname":"Teresa","customer_lname":"Gray"}
# {"customer_id":8314,"customer_spending":5989.29,"customer_fname":"Angela","customer_lname":"Walsh"}
sorted_product_daily_revenue.write.format('parquet')\
.mode('overwrite')\
.partitionBy('product_name')\
.save(path = 'parquet_sorted_product_daily_revenue')
sorted_product_daily_revenue.coalesce(1).write\
.orc(path = 'orc_sorted_product_daily_revenue',\
partitionBy = 'product_name', \
compression = 'lzo')
top5_customers.write.csv('top5_customers', header=True)
# customer_id,customer_spending,customer_fname,customer_lname
# 9337,6585.33,Mary,Smith
# 3710,6169.4,Ashley,Smith
# 2723,6159.25,Mary,Brady
# 10351,5989.5,Teresa,Gray
# 8314,5989.29,Angela,Walsh
pipe_sep_top5_customers = top5_customers.rdd\
.map(lambda row: list(row.asDict().values()))\
.map(lambda row: ["|".join(map(str,row))])\
.toDF(schema='value string')
pipe_sep_top5_customers.write.text('pipe_sep_top5_customers')
# 9337|6585.33|Mary|Smith
# 3710|6169.4|Ashley|Smith
# 2723|6159.25|Mary|Brady
# 10351|5989.5|Teresa|Gray
# 8314|5989.29|Angela|Walsh
#orders = spark.read.csv..
#order_items = spark.read.csv..
#customers = spark.read.csv..
orders_sel = orders\
.withColumn('order_date', F.substring('order_date', 1, 10))
order_items_sel = order_items\
.select('order_item_order_id', 'order_item_product_id', 'order_item_subtotal')
orders_done = orders_sel\
.where(F.col('order_status').isin('CLOSED', 'COMPLETE'))