Last active
March 1, 2022 04:18
-
-
Save prafullapati/e332e8afc004f02257a40ff3687335f5 to your computer and use it in GitHub Desktop.
PySpark Codes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark.sql.types import IntegerType,DateType, FloatType | |
from pyspark.sql.functions import * | |
spark = SparkSession.\ | |
builder.\ | |
master('local').\ | |
appName('Daily Product Revenue').\ | |
getOrCreate() | |
spark.conf.set('spark.sql.shuffle.partitions','2') | |
ordersCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\orders").\ | |
toDF('order_id','order_date','order_customer_id','order_status') | |
orders = ordersCSV.\ | |
withColumn('order_id',ordersCSV.order_id.cast(IntegerType())).\ | |
withColumn('order_date',ordersCSV.order_date.cast(DateType())).\ | |
withColumn('order_customer_id',ordersCSV.order_customer_id.cast(IntegerType())) | |
orderItemCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\order_items").\ | |
toDF('order_item_id','order_item_order_id','order_item_product_id', | |
'order_item_quantity','order_item_subtotal', 'order_item_product_price') | |
orderItems = orderItemCSV.\ | |
withColumn('order_item_id',orderItemCSV.order_item_id.cast(IntegerType())).\ | |
withColumn('order_item_order_id',orderItemCSV.order_item_order_id.cast(IntegerType())).\ | |
withColumn('order_item_product_id',orderItemCSV.order_item_product_id.cast(IntegerType())).\ | |
withColumn('order_item_quantity',orderItemCSV.order_item_quantity.cast(IntegerType())).\ | |
withColumn('order_item_subtotal',orderItemCSV.order_item_subtotal.cast(FloatType())).\ | |
withColumn('order_item_product_price',orderItemCSV.order_item_product_price.cast(FloatType())) | |
orders.createOrReplaceTempView('orders') | |
orderItems.createOrReplaceTempView('orderItems') | |
dailyProductRevenue = spark.sql('''select o.order_date, oi.order_item_order_id, | |
round(sum(order_item_subtotal),2) order_revenue | |
from orders o join orderItems oi | |
on o.order_id = oi.order_item_order_id | |
where o.order_status in ("COMPLETE","CLOSED") | |
group by o.order_date, oi.order_item_order_id | |
order by o.order_date, order_revenue desc''') | |
dailyProductRevenue.write.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\daily_product_rvenue_sql") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark.sql.types import IntegerType | |
spark = SparkSession.builder.master('local').appName('Getting Started').getOrCreate() | |
#orders = spark.read.csv('/user/prafullapatigmail/retail_db/orders') | |
#orders.printSchema() | |
#orders.show() | |
orderCSV = spark.read.csv('/user/prafullapatigmail/retail_db/orders').toDF('order_id','order_date','order_customer_id','order_status') | |
orders = orderCSV.\ | |
withColumn('order_id',orderCSV.order_id.cast(IntegerType())).\ | |
withColumn('order_customer_id',orderCSV.order_customer_id.cast(IntegerType())) | |
orders.printSchema() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark.sql.types import IntegerType,DateType, FloatType | |
from pyspark.sql.functions import * | |
spark = SparkSession.\ | |
builder.\ | |
master('local').\ | |
appName('Daily Product Revenue').\ | |
getOrCreate() | |
spark.conf.set('spark.sql.shuffle.partitions','2') | |
ordersCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\orders").\ | |
toDF('order_id','order_date','order_customer_id','order_status') | |
orders = ordersCSV.\ | |
withColumn('order_id',ordersCSV.order_id.cast(IntegerType())).\ | |
withColumn('order_date',ordersCSV.order_date.cast(DateType())).\ | |
withColumn('order_customer_id',ordersCSV.order_customer_id.cast(IntegerType())) | |
orderItemCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\order_items").\ | |
toDF('order_item_id','order_item_order_id','order_item_product_id', | |
'order_item_quantity','order_item_subtotal', 'order_item_product_price') | |
orderItems = orderItemCSV.\ | |
withColumn('order_item_id',orderItemCSV.order_item_id.cast(IntegerType())).\ | |
withColumn('order_item_order_id',orderItemCSV.order_item_order_id.cast(IntegerType())).\ | |
withColumn('order_item_product_id',orderItemCSV.order_item_product_id.cast(IntegerType())).\ | |
withColumn('order_item_quantity',orderItemCSV.order_item_quantity.cast(IntegerType())).\ | |
withColumn('order_item_subtotal',orderItemCSV.order_item_subtotal.cast(FloatType())).\ | |
withColumn('order_item_product_price',orderItemCSV.order_item_product_price.cast(FloatType())) | |
dailyProductRevenue = orders.where('order_status in ("COMPLETE","CLOSED")').\ | |
join(orderItems, orders.order_id == orderItems.order_item_order_id).\ | |
groupBy('order_date', 'order_item_product_id').\ | |
agg(round(sum('order_item_subtotal'),2).alias('revenue')) | |
dailyProductRevenueSorted = dailyProductRevenue.orderBy('order_date', dailyProductRevenue.revenue.desc()) | |
dailyProductRevenueSorted.write.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\ordersorted") | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
def process_partition(partition): | |
yield sum(partition) | |
def process_partition_y_sum(partition): | |
number_sum = 0 | |
for i in partition: | |
number_sum += i | |
yield (number_sum) | |
def process_partition_y_size(partition): | |
length = [] | |
for i in partition: | |
length.append(len(i)) | |
yield length | |
if __name__ == "__main__": | |
spark = SparkSession \ | |
.builder \ | |
.appName("SparkSession Object") \ | |
.master("local[*]") \ | |
.enableHiveSupport() \ | |
.getOrCreate() | |
l = [1,1,1,2,2,2,3,3,3] | |
lrdd = spark.sparkContext.parallelize(l, 3) | |
print(lrdd.getNumPartitions()) | |
print(lrdd.mapPartitions(process_partition).collect()) | |
"""strlist = ["Ally", "Adiva", "Prafulla", "Sagarika"] | |
srdd = spark.sparkContext.parallelize(strlist, 2) | |
print(srdd.map(upper).collect())""" | |
text = spark.sparkContext.textFile("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\partitions",2) | |
print(text.getNumPartitions()) | |
print(text.mapPartitions(process_partition_y_size).collect()) | |
spark.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark.sql.functions import * | |
spark = SparkSession.builder\ | |
.master("local")\ | |
.config("spark.sql.autoBroadcastJoinThreshold", -1)\ | |
.config("spark.executor.memory", "500mb")\ | |
.appName("Exercise1")\ | |
.getOrCreate() | |
products = spark.read.parquet('C:\\Users\\Sagarika\\Downloads\\DatasetToCompleteTheSixSparkExercises\\products_parquet') | |
sales = spark.read.parquet('C:\\Users\\Sagarika\\Downloads\\DatasetToCompleteTheSixSparkExercises\\sales_parquet') | |
sellers = spark.read.parquet('C:\\Users\\Sagarika\\Downloads\\DatasetToCompleteTheSixSparkExercises\\sellers_parquet') | |
# print("Product Count = ", products.count()) | |
# print("Sales Count = ", sales.count()) | |
# print("Sellers Count = ", sellers.count()) | |
print("Number of products sold atleast once") | |
# sales.select(countDistinct("product_id")).show() | |
sales.createTempView('sales') | |
spark.sql('select count(Distinct order_id) from sales ').show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark.sql.types import Row | |
if __name__ == "__main__": | |
spark = SparkSession\ | |
.builder\ | |
.appName("Hello World")\ | |
.master("local[*]")\ | |
.getOrCreate() | |
df_rows_list = [Row(id=1, name="Prafulla", cirt="Odisha"), Row(id=1, name="Sagu", cirt="Odisha")] | |
df = spark.createDataFrame(df_rows_list) | |
df.show(10,False) | |
spark.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark.sql.types import IntegerType,DateType, FloatType | |
from pyspark.sql.functions import * | |
from pyspark.sql.window import Window | |
from pyspark.sql.functions import rank | |
spark = SparkSession.\ | |
builder.\ | |
master('local').\ | |
appName('Daily Product Revenue').\ | |
getOrCreate() | |
spark.conf.set('spark.sql.shuffle.partitions','2') | |
ordersCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\orders").\ | |
toDF('order_id','order_date','order_customer_id','order_status') | |
orders = ordersCSV.\ | |
withColumn('order_id',ordersCSV.order_id.cast(IntegerType())).\ | |
withColumn('order_date',ordersCSV.order_date.cast(DateType())).\ | |
withColumn('order_customer_id',ordersCSV.order_customer_id.cast(IntegerType())) | |
orderItemCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\order_items").\ | |
toDF('order_item_id','order_item_order_id','order_item_product_id', | |
'order_item_quantity','order_item_subtotal', 'order_item_product_price') | |
orderItems = orderItemCSV.\ | |
withColumn('order_item_id',orderItemCSV.order_item_id.cast(IntegerType())).\ | |
withColumn('order_item_order_id',orderItemCSV.order_item_order_id.cast(IntegerType())).\ | |
withColumn('order_item_product_id',orderItemCSV.order_item_product_id.cast(IntegerType())).\ | |
withColumn('order_item_quantity',orderItemCSV.order_item_quantity.cast(IntegerType())).\ | |
withColumn('order_item_subtotal',orderItemCSV.order_item_subtotal.cast(FloatType())).\ | |
withColumn('order_item_product_price',orderItemCSV.order_item_product_price.cast(FloatType())) | |
dailyProductRevenue = orders.where('order_status in ("COMPLETE","CLOSED")').\ | |
join(orderItems, orders.order_id == orderItems.order_item_order_id).\ | |
groupBy('order_date', 'order_item_product_id').\ | |
agg(round(sum('order_item_subtotal'),2).alias('revenue')) | |
spec = Window.partitionBy(dailyProductRevenue.order_date).\ | |
orderBy(dailyProductRevenue.revenue.desc()) | |
dailyProductRevenueRanked = dailyProductRevenue.withColumn('rnk', rank().over(spec)) | |
dailyProductRevenueRanked.\ | |
where(dailyProductRevenueRanked.rnk <=5).\ | |
orderBy(dailyProductRevenueRanked.order_date, dailyProductRevenueRanked.revenue.desc()).show() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
spark = SparkSession.\ | |
builder.\ | |
master('local').\ | |
appName('Daily Product Revenue').\ | |
getOrCreate() | |
spark.conf.set('spark.sql.shuffle.partitions','2') | |
from pyspark.sql.types import IntegerType,DateType, FloatType | |
from pyspark.sql.functions import * | |
ordersCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\orders").\ | |
toDF('order_id','order_date','order_customer_id','order_status') | |
orders = ordersCSV.\ | |
withColumn('order_id',ordersCSV.order_id.cast(IntegerType())).\ | |
withColumn('order_date',ordersCSV.order_date.cast(DateType())).\ | |
withColumn('order_customer_id',ordersCSV.order_customer_id.cast(IntegerType())) | |
orderItemCSV = spark.read.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\order_items").\ | |
toDF('order_item_id','order_item_order_id','order_item_product_id', | |
'order_item_quantity','order_item_subtotal', 'order_item_product_price') | |
orderItems = orderItemCSV.\ | |
withColumn('order_item_id',orderItemCSV.order_item_id.cast(IntegerType())).\ | |
withColumn('order_item_order_id',orderItemCSV.order_item_order_id.cast(IntegerType())).\ | |
withColumn('order_item_product_id',orderItemCSV.order_item_product_id.cast(IntegerType())).\ | |
withColumn('order_item_quantity',orderItemCSV.order_item_quantity.cast(IntegerType())).\ | |
withColumn('order_item_subtotal',orderItemCSV.order_item_subtotal.cast(FloatType())).\ | |
withColumn('order_item_product_price',orderItemCSV.order_item_product_price.cast(FloatType())) | |
orders.createOrReplaceTempView('orders') | |
orderItems.createOrReplaceTempView('orderItems') | |
dailyProductRevenue = spark.sql('''select o.order_date, oi.order_item_order_id, | |
round(sum(order_item_subtotal),2) order_revenue | |
from orders o join orderItems oi | |
on o.order_id = oi.order_item_order_id | |
where o.order_status in ("COMPLETE","CLOSED") | |
group by o.order_date, oi.order_item_order_id | |
order by o.order_date, order_revenue desc''') | |
dailyProductRevenue.createOrReplaceTempView('dailyProductRevenue') | |
topNDailyProducts = spark.sql('''select * from (select d.*, | |
rank() over (partition by order_date order by order_revenue desc) rnk | |
from dailyProductRevenue d) q where rnk <=5 | |
order by order_date, order_revenue desc''') | |
topNDailyProducts.write.csv("C:\\Users\\Sagarika\\Desktop\\data\\retail_db\\topNDailyProducts_sql") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment