Skip to content

Instantly share code, notes, and snippets.

@prafullapati
Last active March 1, 2022 04:18
Show Gist options
  • Save prafullapati/e332e8afc004f02257a40ff3687335f5 to your computer and use it in GitHub Desktop.
Save prafullapati/e332e8afc004f02257a40ff3687335f5 to your computer and use it in GitHub Desktop.
PySpark Codes
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,DateType, FloatType
from pyspark.sql.functions import *
spark = SparkSession.\
builder.\
master('local').\
appName('Daily Product Revenue').\
getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions','2')
ordersCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\orders").\
toDF('order_id','order_date','order_customer_id','order_status')
orders = ordersCSV.\
withColumn('order_id',ordersCSV.order_id.cast(IntegerType())).\
withColumn('order_date',ordersCSV.order_date.cast(DateType())).\
withColumn('order_customer_id',ordersCSV.order_customer_id.cast(IntegerType()))
orderItemCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\order_items").\
toDF('order_item_id','order_item_order_id','order_item_product_id',
'order_item_quantity','order_item_subtotal', 'order_item_product_price')
orderItems = orderItemCSV.\
withColumn('order_item_id',orderItemCSV.order_item_id.cast(IntegerType())).\
withColumn('order_item_order_id',orderItemCSV.order_item_order_id.cast(IntegerType())).\
withColumn('order_item_product_id',orderItemCSV.order_item_product_id.cast(IntegerType())).\
withColumn('order_item_quantity',orderItemCSV.order_item_quantity.cast(IntegerType())).\
withColumn('order_item_subtotal',orderItemCSV.order_item_subtotal.cast(FloatType())).\
withColumn('order_item_product_price',orderItemCSV.order_item_product_price.cast(FloatType()))
orders.createOrReplaceTempView('orders')
orderItems.createOrReplaceTempView('orderItems')
dailyProductRevenue = spark.sql('''select o.order_date, oi.order_item_order_id,
round(sum(order_item_subtotal),2) order_revenue
from orders o join orderItems oi
on o.order_id = oi.order_item_order_id
where o.order_status in ("COMPLETE","CLOSED")
group by o.order_date, oi.order_item_order_id
order by o.order_date, order_revenue desc''')
dailyProductRevenue.write.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\daily_product_rvenue_sql")
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
spark = SparkSession.builder.master('local').appName('Getting Started').getOrCreate()
#orders = spark.read.csv('/user/prafullapatigmail/retail_db/orders')
#orders.printSchema()
#orders.show()
orderCSV = spark.read.csv('/user/prafullapatigmail/retail_db/orders').toDF('order_id','order_date','order_customer_id','order_status')
orders = orderCSV.\
withColumn('order_id',orderCSV.order_id.cast(IntegerType())).\
withColumn('order_customer_id',orderCSV.order_customer_id.cast(IntegerType()))
orders.printSchema()
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,DateType, FloatType
from pyspark.sql.functions import *
spark = SparkSession.\
builder.\
master('local').\
appName('Daily Product Revenue').\
getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions','2')
ordersCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\orders").\
toDF('order_id','order_date','order_customer_id','order_status')
orders = ordersCSV.\
withColumn('order_id',ordersCSV.order_id.cast(IntegerType())).\
withColumn('order_date',ordersCSV.order_date.cast(DateType())).\
withColumn('order_customer_id',ordersCSV.order_customer_id.cast(IntegerType()))
orderItemCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\order_items").\
toDF('order_item_id','order_item_order_id','order_item_product_id',
'order_item_quantity','order_item_subtotal', 'order_item_product_price')
orderItems = orderItemCSV.\
withColumn('order_item_id',orderItemCSV.order_item_id.cast(IntegerType())).\
withColumn('order_item_order_id',orderItemCSV.order_item_order_id.cast(IntegerType())).\
withColumn('order_item_product_id',orderItemCSV.order_item_product_id.cast(IntegerType())).\
withColumn('order_item_quantity',orderItemCSV.order_item_quantity.cast(IntegerType())).\
withColumn('order_item_subtotal',orderItemCSV.order_item_subtotal.cast(FloatType())).\
withColumn('order_item_product_price',orderItemCSV.order_item_product_price.cast(FloatType()))
dailyProductRevenue = orders.where('order_status in ("COMPLETE","CLOSED")').\
join(orderItems, orders.order_id == orderItems.order_item_order_id).\
groupBy('order_date', 'order_item_product_id').\
agg(round(sum('order_item_subtotal'),2).alias('revenue'))
dailyProductRevenueSorted = dailyProductRevenue.orderBy('order_date', dailyProductRevenue.revenue.desc())
dailyProductRevenueSorted.write.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\ordersorted")
from pyspark.sql import SparkSession
def process_partition(partition):
yield sum(partition)
def process_partition_y_sum(partition):
number_sum = 0
for i in partition:
number_sum += i
yield (number_sum)
def process_partition_y_size(partition):
length = []
for i in partition:
length.append(len(i))
yield length
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("SparkSession Object") \
.master("local[*]") \
.enableHiveSupport() \
.getOrCreate()
l = [1,1,1,2,2,2,3,3,3]
lrdd = spark.sparkContext.parallelize(l, 3)
print(lrdd.getNumPartitions())
print(lrdd.mapPartitions(process_partition).collect())
"""strlist = ["Ally", "Adiva", "Prafulla", "Sagarika"]
srdd = spark.sparkContext.parallelize(strlist, 2)
print(srdd.map(upper).collect())"""
text = spark.sparkContext.textFile("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\partitions",2)
print(text.getNumPartitions())
print(text.mapPartitions(process_partition_y_size).collect())
spark.stop()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder\
.master("local")\
.config("spark.sql.autoBroadcastJoinThreshold", -1)\
.config("spark.executor.memory", "500mb")\
.appName("Exercise1")\
.getOrCreate()
products = spark.read.parquet('C:\\Users\\Sagarika\\Downloads\\DatasetToCompleteTheSixSparkExercises\\products_parquet')
sales = spark.read.parquet('C:\\Users\\Sagarika\\Downloads\\DatasetToCompleteTheSixSparkExercises\\sales_parquet')
sellers = spark.read.parquet('C:\\Users\\Sagarika\\Downloads\\DatasetToCompleteTheSixSparkExercises\\sellers_parquet')
# print("Product Count = ", products.count())
# print("Sales Count = ", sales.count())
# print("Sellers Count = ", sellers.count())
print("Number of products sold atleast once")
# sales.select(countDistinct("product_id")).show()
sales.createTempView('sales')
spark.sql('select count(Distinct order_id) from sales ').show()
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("Hello World")\
.master("local[*]")\
.getOrCreate()
df_rows_list = [Row(id=1, name="Prafulla", cirt="Odisha"), Row(id=1, name="Sagu", cirt="Odisha")]
df = spark.createDataFrame(df_rows_list)
df.show(10,False)
spark.stop()
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType,DateType, FloatType
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.functions import rank
spark = SparkSession.\
builder.\
master('local').\
appName('Daily Product Revenue').\
getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions','2')
ordersCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\orders").\
toDF('order_id','order_date','order_customer_id','order_status')
orders = ordersCSV.\
withColumn('order_id',ordersCSV.order_id.cast(IntegerType())).\
withColumn('order_date',ordersCSV.order_date.cast(DateType())).\
withColumn('order_customer_id',ordersCSV.order_customer_id.cast(IntegerType()))
orderItemCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\order_items").\
toDF('order_item_id','order_item_order_id','order_item_product_id',
'order_item_quantity','order_item_subtotal', 'order_item_product_price')
orderItems = orderItemCSV.\
withColumn('order_item_id',orderItemCSV.order_item_id.cast(IntegerType())).\
withColumn('order_item_order_id',orderItemCSV.order_item_order_id.cast(IntegerType())).\
withColumn('order_item_product_id',orderItemCSV.order_item_product_id.cast(IntegerType())).\
withColumn('order_item_quantity',orderItemCSV.order_item_quantity.cast(IntegerType())).\
withColumn('order_item_subtotal',orderItemCSV.order_item_subtotal.cast(FloatType())).\
withColumn('order_item_product_price',orderItemCSV.order_item_product_price.cast(FloatType()))
dailyProductRevenue = orders.where('order_status in ("COMPLETE","CLOSED")').\
join(orderItems, orders.order_id == orderItems.order_item_order_id).\
groupBy('order_date', 'order_item_product_id').\
agg(round(sum('order_item_subtotal'),2).alias('revenue'))
spec = Window.partitionBy(dailyProductRevenue.order_date).\
orderBy(dailyProductRevenue.revenue.desc())
dailyProductRevenueRanked = dailyProductRevenue.withColumn('rnk', rank().over(spec))
dailyProductRevenueRanked.\
where(dailyProductRevenueRanked.rnk <=5).\
orderBy(dailyProductRevenueRanked.order_date, dailyProductRevenueRanked.revenue.desc()).show()
from pyspark.sql import SparkSession
spark = SparkSession.\
builder.\
master('local').\
appName('Daily Product Revenue').\
getOrCreate()
spark.conf.set('spark.sql.shuffle.partitions','2')
from pyspark.sql.types import IntegerType,DateType, FloatType
from pyspark.sql.functions import *
ordersCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\orders").\
toDF('order_id','order_date','order_customer_id','order_status')
orders = ordersCSV.\
withColumn('order_id',ordersCSV.order_id.cast(IntegerType())).\
withColumn('order_date',ordersCSV.order_date.cast(DateType())).\
withColumn('order_customer_id',ordersCSV.order_customer_id.cast(IntegerType()))
orderItemCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\order_items").\
toDF('order_item_id','order_item_order_id','order_item_product_id',
'order_item_quantity','order_item_subtotal', 'order_item_product_price')
orderItems = orderItemCSV.\
withColumn('order_item_id',orderItemCSV.order_item_id.cast(IntegerType())).\
withColumn('order_item_order_id',orderItemCSV.order_item_order_id.cast(IntegerType())).\
withColumn('order_item_product_id',orderItemCSV.order_item_product_id.cast(IntegerType())).\
withColumn('order_item_quantity',orderItemCSV.order_item_quantity.cast(IntegerType())).\
withColumn('order_item_subtotal',orderItemCSV.order_item_subtotal.cast(FloatType())).\
withColumn('order_item_product_price',orderItemCSV.order_item_product_price.cast(FloatType()))
orders.createOrReplaceTempView('orders')
orderItems.createOrReplaceTempView('orderItems')
dailyProductRevenue = spark.sql('''select o.order_date, oi.order_item_order_id,
round(sum(order_item_subtotal),2) order_revenue
from orders o join orderItems oi
on o.order_id = oi.order_item_order_id
where o.order_status in ("COMPLETE","CLOSED")
group by o.order_date, oi.order_item_order_id
order by o.order_date, order_revenue desc''')
dailyProductRevenue.createOrReplaceTempView('dailyProductRevenue')
topNDailyProducts = spark.sql('''select * from (select d.*,
rank() over (partition by order_date order by order_revenue desc) rnk
from dailyProductRevenue d) q where rnk <=5
order by order_date, order_revenue desc''')
topNDailyProducts.write.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\topNDailyProducts_sql")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment