-
-
Save jc4p/d4d4f2bdb4244b7fd2616a5b1be2ec93 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import numpy as np | |
from statsforecast import StatsForecast | |
from statsforecast.models import ( | |
AutoARIMA, | |
SeasonalNaive, | |
MSTL, | |
AutoETS, | |
AutoTheta, | |
AutoCES | |
) | |
from utilsforecast.losses import mae, mape, rmse, mase | |
from utilsforecast.evaluation import evaluate | |
import polars as pl | |
from datetime import datetime | |
import multiprocessing | |
from typing import Dict, List, Tuple | |
import warnings | |
from tqdm import tqdm | |
import logging | |
from joblib import dump, load | |
warnings.filterwarnings('ignore') | |
class ModernEcomForecaster: | |
def __init__( | |
self, | |
orders_df: pd.DataFrame, | |
line_items_df: pd.DataFrame, | |
products_df: pd.DataFrame, | |
forecast_horizon: int = 365, | |
season_lengths: List[int] = [7] # Only weekly seasonality | |
): | |
self.orders_df = orders_df | |
self.line_items_df = line_items_df | |
self.products_df = products_df | |
self.forecast_horizon = forecast_horizon | |
self.season_lengths = season_lengths | |
self.models = {} | |
self.forecasts = {} | |
self.metrics = {} | |
# Add logging configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
self.logger = logging.getLogger(__name__) | |
def prepare_data(self) -> Dict[str, pd.DataFrame]: | |
""" | |
Prepare data for forecasting with multiple seasonal patterns | |
""" | |
self.logger.info("Starting data preparation...") | |
# Convert timestamps and filter data from 2021 | |
self.orders_df['created_at'] = pd.to_datetime(self.orders_df['created_at']) | |
self.orders_df = self.orders_df[self.orders_df['created_at'] >= '2021-01-01'] | |
# Initialize dictionary for different metrics | |
daily_data = {} | |
# Daily orders and revenue | |
orders_daily = self.orders_df.resample('D', on='created_at').agg({ | |
'id': 'count', | |
'total_price': 'sum' | |
}).reset_index() | |
# Prepare data in StatsForecast format | |
daily_data['orders'] = self._prepare_sf_format( | |
orders_daily, 'id', 'Total Orders' | |
) | |
daily_data['revenue'] = self._prepare_sf_format( | |
orders_daily, 'total_price', 'Total Revenue' | |
) | |
# Average order value | |
aov_daily = orders_daily.assign( | |
aov=lambda x: x['total_price'] / x['id'] | |
) | |
daily_data['aov'] = self._prepare_sf_format( | |
aov_daily, 'aov', 'Average Order Value' | |
) | |
# Product type distribution | |
merged_df = self.line_items_df.merge( | |
self.products_df, | |
left_on='product_id', | |
right_on='id' | |
).merge( | |
self.orders_df[['id', 'created_at']], | |
left_on='order_id', | |
right_on='id' | |
) | |
# Calculate the number of days with data for each product type | |
product_type_days = merged_df.groupby('product_type').agg( | |
days_with_data=('created_at', lambda x: x.dt.date.nunique()) | |
) | |
# Filter product types with at least 30 days of history | |
# and exclude specific product types and NaN values | |
excluded_types = ['Credo Rewards', 'Free', 'Default'] | |
valid_product_types = product_type_days[ | |
(product_type_days['days_with_data'] >= 30) & | |
(~product_type_days.index.isin(excluded_types)) & | |
(product_type_days.index.notna()) & # Filter out NaN values | |
(product_type_days.index.str.contains(',')) # Only keep types with commas | |
].index | |
# Add progress bar for product type processing | |
for prod_type in tqdm(valid_product_types, desc="Processing product types"): | |
prod_daily = merged_df[ | |
merged_df['product_type'] == prod_type | |
].groupby( | |
pd.Grouper(key='created_at', freq='D') | |
).size().reset_index(name='count') | |
daily_data[f'product_type_{prod_type}'] = self._prepare_sf_format( | |
prod_daily, 'count', f'Product Type: {prod_type}' | |
) | |
self.logger.info(f"Data preparation complete. Generated {len(daily_data)} time series.") | |
return daily_data | |
def _prepare_sf_format( | |
self, | |
df: pd.DataFrame, | |
value_col: str, | |
series_name: str | |
) -> pd.DataFrame: | |
""" | |
Convert data to StatsForecast format | |
""" | |
return df.rename(columns={ | |
'created_at': 'ds', | |
value_col: 'y' | |
}).assign( | |
unique_id=series_name | |
) | |
def create_model_ensemble(self, metric: str = None) -> StatsForecast: | |
""" | |
Create an ensemble of models suitable for e-commerce data | |
""" | |
# Use only weekly seasonality for product type forecasts | |
if metric and metric.startswith('product_type_'): | |
season_lengths = [7, 30] # Only weekly and monthly seasonality for product types | |
else: | |
season_lengths = self.season_lengths # Full seasonality for main metrics | |
models = [ | |
# MSTL for multiple seasonal patterns | |
MSTL( | |
season_length=season_lengths, | |
trend_forecaster=AutoTheta() | |
), | |
# AutoARIMA for trend and short-term patterns | |
AutoARIMA( | |
season_length=season_lengths[0], | |
stepwise=True, | |
approximation=False | |
), | |
# AutoETS for exponential smoothing with multiple seasonality | |
AutoETS( | |
season_length=season_lengths[0], | |
model='ZZZ', | |
damped=None | |
), | |
# AutoTheta for robust trend handling | |
AutoTheta( | |
season_length=season_lengths[0] | |
), | |
# AutoCES for complex exponential smoothing | |
AutoCES( | |
season_length=season_lengths[0] | |
), | |
# SeasonalNaive as baseline | |
SeasonalNaive( | |
season_length=season_lengths[0] | |
) | |
] | |
return StatsForecast( | |
models=models, | |
freq='D', | |
n_jobs=multiprocessing.cpu_count() | |
) | |
def detect_events(self, data: pd.DataFrame, include_price: bool = False) -> pd.DataFrame: | |
""" | |
Detect and mark significant events and seasonal patterns | |
""" | |
# Convert to polars for faster processing | |
pl_df = pl.from_pandas(data) | |
# Add temporal features | |
pl_df = pl_df.with_columns([ | |
pl.col('ds').dt.month().alias('month'), | |
pl.col('ds').dt.weekday().alias('weekday'), | |
pl.col('ds').dt.day().alias('day'), | |
pl.col('ds').dt.quarter().alias('quarter'), | |
pl.col('ds').dt.week().alias('week_of_year'), | |
pl.col('ds').dt.ordinal_day().alias('day_of_year') | |
]) | |
# Mark Black Friday periods (4th Friday of November) | |
black_friday_mask = ( | |
(pl_df['month'] == 11) & | |
(pl_df['weekday'] == 4) & | |
(pl_df['day'] >= 22) & | |
(pl_df['day'] <= 28) | |
) | |
# Mark various seasonal events | |
holiday_mask = (pl_df['month'] == 12) # December holiday season | |
summer_sale_mask = (pl_df['month'].is_in([6, 7])) # Summer sales | |
weekend_mask = (pl_df['weekday'].is_in([5, 6])) # Weekends | |
quarter_end_mask = ( | |
(pl_df['month'].is_in([3, 6, 9, 12])) & | |
(pl_df['day'] >= 25) | |
) # Quarter-end periods | |
# Add event indicators | |
pl_df = pl_df.with_columns([ | |
pl.when(black_friday_mask) | |
.then(1) | |
.otherwise(0) | |
.alias('is_black_friday'), | |
pl.when(holiday_mask) | |
.then(1) | |
.otherwise(0) | |
.alias('is_holiday_season'), | |
pl.when(summer_sale_mask) | |
.then(1) | |
.otherwise(0) | |
.alias('is_summer_sale'), | |
pl.when(weekend_mask) | |
.then(1) | |
.otherwise(0) | |
.alias('is_weekend'), | |
pl.when(quarter_end_mask) | |
.then(1) | |
.otherwise(0) | |
.alias('is_quarter_end'), | |
# Add cyclical encoding for weekly patterns | |
(2 * np.pi * pl.col('weekday') / 7).sin().alias('weekday_sin'), | |
(2 * np.pi * pl.col('weekday') / 7).cos().alias('weekday_cos'), | |
# Add cyclical encoding for yearly patterns | |
(2 * np.pi * pl.col('day_of_year') / 365).sin().alias('yearly_sin'), | |
(2 * np.pi * pl.col('day_of_year') / 365).cos().alias('yearly_cos') | |
]) | |
# Convert back to pandas | |
result_df = pl_df.to_pandas() | |
if include_price: | |
result_df['total_price'] = 0 # Default value for future dates | |
return result_df | |
def train_and_forecast(self) -> None: | |
""" | |
Train models and generate forecasts for all metrics | |
""" | |
self.logger.info("Starting model training and forecasting process...") | |
daily_data = self.prepare_data() | |
for metric, data in daily_data.items(): | |
self.logger.info(f"Processing metric: {metric}") | |
# Store the original unique_id and y columns | |
unique_id = data['unique_id'].copy() | |
y_values = data['y'].copy() | |
# Detect and add events | |
self.logger.debug(f"Detecting events for {metric}") | |
data = self.detect_events(data, include_price=True) | |
# Add back the required columns | |
data['unique_id'] = unique_id | |
data['y'] = y_values | |
# Create future dates for forecasting | |
future_dates = pd.date_range( | |
start=data['ds'].max() + pd.Timedelta(days=1), | |
periods=self.forecast_horizon, | |
freq='D' | |
) | |
# Create future features DataFrame | |
future_df = pd.DataFrame({'ds': future_dates}) | |
future_df = self.detect_events(future_df, include_price=True) | |
# Add unique_id to future_df (required for StatsForecast) | |
future_df['unique_id'] = unique_id.iloc[0] # Use the same ID as training data | |
# Ensure columns match between training and future data | |
required_cols = ['ds', 'month', 'weekday', 'day', 'is_black_friday', | |
'is_holiday_season', 'total_price', 'unique_id'] | |
if 'y' in data.columns: | |
required_cols.append('y') | |
data = data[required_cols] # Keep only necessary columns for training | |
future_df = future_df[required_cols[:-1]] # Don't need 'y' in future data | |
# Create and train model ensemble | |
self.logger.info(f"Creating model ensemble for {metric}") | |
model = self.create_model_ensemble(metric) | |
# Split data for evaluation | |
train_data = data[data['ds'] <= '2023-06-22'] | |
test_data = data[data['ds'] > '2023-06-22'] | |
self.logger.debug(f"Training data size: {len(train_data)}, Test data size: {len(test_data)}") | |
# Fit model and generate forecasts | |
self.logger.info(f"Training models for {metric}") | |
model.fit(train_data) | |
self.logger.info(f"Generating forecasts for {metric}") | |
forecast = model.forecast(h=self.forecast_horizon, X_df=future_df) | |
# Store results | |
self.models[metric] = model | |
self.forecasts[metric] = forecast | |
# Calculate metrics if test data is available | |
if not test_data.empty: | |
self.logger.info(f"Calculating performance metrics for {metric}") | |
y_true = test_data['y'].values | |
y_pred = forecast['mean'].values[:len(test_data)] | |
self.metrics[metric] = { | |
'mae': mae(y_true, y_pred), | |
'mape': mape(y_true, y_pred), | |
'rmse': rmse(y_true, y_pred), | |
'mase': mase(y_true, y_pred, seasonality=7) | |
} | |
self.logger.info(f"Metrics for {metric}: MAE={self.metrics[metric]['mae']:.2f}, " | |
f"MAPE={self.metrics[metric]['mape']:.2f}%, " | |
f"RMSE={self.metrics[metric]['rmse']:.2f}") | |
self.logger.info("Model training and forecasting completed successfully") | |
def get_best_model(self, metric: str) -> Tuple[str, float]: | |
""" | |
Identify the best performing model for each metric | |
""" | |
metric_forecasts = self.forecasts[metric] | |
metric_actuals = self.prepare_data()[metric] | |
model_performances = {} | |
for model in self.models[metric].models: | |
model_name = model.__class__.__name__ | |
predictions = metric_forecasts[f'{model_name}_forecast'].values | |
actuals = metric_actuals['y'].values[-len(predictions):] | |
# Calculate MAPE | |
mape = np.mean(np.abs((actuals - predictions) / actuals)) * 100 | |
model_performances[model_name] = mape | |
best_model = min(model_performances.items(), key=lambda x: x[1]) | |
return best_model | |
def save_forecaster(self, filepath: str) -> None: | |
""" | |
Save the forecaster instance using joblib | |
""" | |
self.logger.info(f"Saving forecaster to {filepath}") | |
dump(self, filepath) | |
@classmethod | |
def load_forecaster(cls, filepath: str) -> 'ModernEcomForecaster': | |
""" | |
Load a saved forecaster instance using joblib | |
""" | |
return load(filepath) | |
def main(): | |
# Load data | |
orders_df = pd.read_csv('formatted/orders.csv') | |
line_items_df = pd.read_csv('formatted/line_items.csv') | |
products_df = pd.read_csv('formatted/products.csv') | |
# Initialize forecaster with aligned seasonal lengths | |
forecaster = ModernEcomForecaster( | |
orders_df=orders_df, | |
line_items_df=line_items_df, | |
products_df=products_df, | |
forecast_horizon=365, | |
season_lengths=[7, 30, 365] # Weekly, monthly, and yearly seasonality | |
) | |
# Train models and generate forecasts | |
forecaster.train_and_forecast() | |
# Get best models for each metric | |
best_models = { | |
metric: forecaster.get_best_model(metric) | |
for metric in forecaster.forecasts.keys() | |
} | |
# Save the forecaster and best models | |
forecaster.save_forecaster('forecaster.pkl') | |
dump(best_models, 'best_models.pkl') | |
return forecaster, best_models | |
if __name__ == "__main__": | |
forecaster, best_models = main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment