Skip to content

Instantly share code, notes, and snippets.

@SuzieJi
Last active March 29, 2021 07:23
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
from fbprophet import Prophet
import pickle
from sklearn.metrics import mean_absolute_error
#read data from Incorta
prod_demand_df = read("TimeSeriesNotebooks.HISTORICAL_PRODUCT_DEMAND")
prod_demand_df = prod_demand_df.withColumnRenamed("Date", "Order_Date")
prod_demand_df = prod_demand_df.select(date_format(col("Order_Date"),'yyyy-MM-dd').alias("Order_Date_Str"),col("Order_Demand"),col("Product_Category"),col("Warehouse"))
pdf = prod_demand_df.toPandas()
#Filter
npdf = pdf.loc[(pdf.Product_Category == 'Category_028') & (pdf.Warehouse == "Whse_J")]
#Add a column with datetime
npdf['pd_Datetime'] = pd.to_datetime(npdf['Order_Date_Str'] + ' 00:00:00')
npdf = npdf.set_index(pd.DatetimeIndex(npdf['pd_Datetime']))
monthly_npdf = pd.DataFrame()
#Aggregate the order demand by month
monthly_npdf['Order_Demand'] = npdf['Order_Demand'].resample('MS').sum()
#Added date list to DataFrame
monthly_npdf['Order_Date'] = list(monthly_npdf.index)
#Date filter
monthly_npdf = monthly_npdf.loc[(monthly_npdf.Order_Date <= pd.to_datetime('2016-12-31 00:00:00'))]
#Rename the columns for Prophet
monthly_npdf.columns = ['y','ds']
#Load ML model
ml_model_path = "/home/incorta/IncortaAnalytics/Tenants/demo/data/ml_model/" + "Order_Demand_Model.pckl"
with open(ml_model_path, 'rb') as fin:
prophet = pickle.load(fin)
#Prediction
future = list()
for i in range(1, 13):
date = '2017-%02d' % i
future.append([date])
future = pd.DataFrame(future)
future.columns = ['ds']
future['ds']= pd.to_datetime(future['ds'])
#Use the model(prophet) to make a forecast
forecast = prophet.predict(future)
#Save the predicted result
product_result = forecast[['ds','yhat']]
product_result['Product_Category'] = 'Category_028'
#Orginal and prediction only for specific Product_Category
monthly_npdf['Product_Category'] = 'Category_028'
product_result = product_result.append(monthly_npdf)
result_df = spark.createDataFrame(product_result)
save(result_df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment