Skip to content

Instantly share code, notes, and snippets.

@jc4p
Created November 25, 2024 00:34
Show Gist options
  • Save jc4p/d4d4f2bdb4244b7fd2616a5b1be2ec93 to your computer and use it in GitHub Desktop.
Save jc4p/d4d4f2bdb4244b7fd2616a5b1be2ec93 to your computer and use it in GitHub Desktop.
import pandas as pd
import numpy as np
from statsforecast import StatsForecast
from statsforecast.models import (
AutoARIMA,
SeasonalNaive,
MSTL,
AutoETS,
AutoTheta,
AutoCES
)
from utilsforecast.losses import mae, mape, rmse, mase
from utilsforecast.evaluation import evaluate
import polars as pl
from datetime import datetime
import multiprocessing
from typing import Dict, List, Tuple
import warnings
from tqdm import tqdm
import logging
from joblib import dump, load
warnings.filterwarnings('ignore')
class ModernEcomForecaster:
def __init__(
self,
orders_df: pd.DataFrame,
line_items_df: pd.DataFrame,
products_df: pd.DataFrame,
forecast_horizon: int = 365,
season_lengths: List[int] = [7] # Only weekly seasonality
):
self.orders_df = orders_df
self.line_items_df = line_items_df
self.products_df = products_df
self.forecast_horizon = forecast_horizon
self.season_lengths = season_lengths
self.models = {}
self.forecasts = {}
self.metrics = {}
# Add logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def prepare_data(self) -> Dict[str, pd.DataFrame]:
"""
Prepare data for forecasting with multiple seasonal patterns
"""
self.logger.info("Starting data preparation...")
# Convert timestamps and filter data from 2021
self.orders_df['created_at'] = pd.to_datetime(self.orders_df['created_at'])
self.orders_df = self.orders_df[self.orders_df['created_at'] >= '2021-01-01']
# Initialize dictionary for different metrics
daily_data = {}
# Daily orders and revenue
orders_daily = self.orders_df.resample('D', on='created_at').agg({
'id': 'count',
'total_price': 'sum'
}).reset_index()
# Prepare data in StatsForecast format
daily_data['orders'] = self._prepare_sf_format(
orders_daily, 'id', 'Total Orders'
)
daily_data['revenue'] = self._prepare_sf_format(
orders_daily, 'total_price', 'Total Revenue'
)
# Average order value
aov_daily = orders_daily.assign(
aov=lambda x: x['total_price'] / x['id']
)
daily_data['aov'] = self._prepare_sf_format(
aov_daily, 'aov', 'Average Order Value'
)
# Product type distribution
merged_df = self.line_items_df.merge(
self.products_df,
left_on='product_id',
right_on='id'
).merge(
self.orders_df[['id', 'created_at']],
left_on='order_id',
right_on='id'
)
# Calculate the number of days with data for each product type
product_type_days = merged_df.groupby('product_type').agg(
days_with_data=('created_at', lambda x: x.dt.date.nunique())
)
# Filter product types with at least 30 days of history
# and exclude specific product types and NaN values
excluded_types = ['Credo Rewards', 'Free', 'Default']
valid_product_types = product_type_days[
(product_type_days['days_with_data'] >= 30) &
(~product_type_days.index.isin(excluded_types)) &
(product_type_days.index.notna()) & # Filter out NaN values
(product_type_days.index.str.contains(',')) # Only keep types with commas
].index
# Add progress bar for product type processing
for prod_type in tqdm(valid_product_types, desc="Processing product types"):
prod_daily = merged_df[
merged_df['product_type'] == prod_type
].groupby(
pd.Grouper(key='created_at', freq='D')
).size().reset_index(name='count')
daily_data[f'product_type_{prod_type}'] = self._prepare_sf_format(
prod_daily, 'count', f'Product Type: {prod_type}'
)
self.logger.info(f"Data preparation complete. Generated {len(daily_data)} time series.")
return daily_data
def _prepare_sf_format(
self,
df: pd.DataFrame,
value_col: str,
series_name: str
) -> pd.DataFrame:
"""
Convert data to StatsForecast format
"""
return df.rename(columns={
'created_at': 'ds',
value_col: 'y'
}).assign(
unique_id=series_name
)
def create_model_ensemble(self, metric: str = None) -> StatsForecast:
"""
Create an ensemble of models suitable for e-commerce data
"""
# Use only weekly seasonality for product type forecasts
if metric and metric.startswith('product_type_'):
season_lengths = [7, 30] # Only weekly and monthly seasonality for product types
else:
season_lengths = self.season_lengths # Full seasonality for main metrics
models = [
# MSTL for multiple seasonal patterns
MSTL(
season_length=season_lengths,
trend_forecaster=AutoTheta()
),
# AutoARIMA for trend and short-term patterns
AutoARIMA(
season_length=season_lengths[0],
stepwise=True,
approximation=False
),
# AutoETS for exponential smoothing with multiple seasonality
AutoETS(
season_length=season_lengths[0],
model='ZZZ',
damped=None
),
# AutoTheta for robust trend handling
AutoTheta(
season_length=season_lengths[0]
),
# AutoCES for complex exponential smoothing
AutoCES(
season_length=season_lengths[0]
),
# SeasonalNaive as baseline
SeasonalNaive(
season_length=season_lengths[0]
)
]
return StatsForecast(
models=models,
freq='D',
n_jobs=multiprocessing.cpu_count()
)
def detect_events(self, data: pd.DataFrame, include_price: bool = False) -> pd.DataFrame:
"""
Detect and mark significant events and seasonal patterns
"""
# Convert to polars for faster processing
pl_df = pl.from_pandas(data)
# Add temporal features
pl_df = pl_df.with_columns([
pl.col('ds').dt.month().alias('month'),
pl.col('ds').dt.weekday().alias('weekday'),
pl.col('ds').dt.day().alias('day'),
pl.col('ds').dt.quarter().alias('quarter'),
pl.col('ds').dt.week().alias('week_of_year'),
pl.col('ds').dt.ordinal_day().alias('day_of_year')
])
# Mark Black Friday periods (4th Friday of November)
black_friday_mask = (
(pl_df['month'] == 11) &
(pl_df['weekday'] == 4) &
(pl_df['day'] >= 22) &
(pl_df['day'] <= 28)
)
# Mark various seasonal events
holiday_mask = (pl_df['month'] == 12) # December holiday season
summer_sale_mask = (pl_df['month'].is_in([6, 7])) # Summer sales
weekend_mask = (pl_df['weekday'].is_in([5, 6])) # Weekends
quarter_end_mask = (
(pl_df['month'].is_in([3, 6, 9, 12])) &
(pl_df['day'] >= 25)
) # Quarter-end periods
# Add event indicators
pl_df = pl_df.with_columns([
pl.when(black_friday_mask)
.then(1)
.otherwise(0)
.alias('is_black_friday'),
pl.when(holiday_mask)
.then(1)
.otherwise(0)
.alias('is_holiday_season'),
pl.when(summer_sale_mask)
.then(1)
.otherwise(0)
.alias('is_summer_sale'),
pl.when(weekend_mask)
.then(1)
.otherwise(0)
.alias('is_weekend'),
pl.when(quarter_end_mask)
.then(1)
.otherwise(0)
.alias('is_quarter_end'),
# Add cyclical encoding for weekly patterns
(2 * np.pi * pl.col('weekday') / 7).sin().alias('weekday_sin'),
(2 * np.pi * pl.col('weekday') / 7).cos().alias('weekday_cos'),
# Add cyclical encoding for yearly patterns
(2 * np.pi * pl.col('day_of_year') / 365).sin().alias('yearly_sin'),
(2 * np.pi * pl.col('day_of_year') / 365).cos().alias('yearly_cos')
])
# Convert back to pandas
result_df = pl_df.to_pandas()
if include_price:
result_df['total_price'] = 0 # Default value for future dates
return result_df
def train_and_forecast(self) -> None:
"""
Train models and generate forecasts for all metrics
"""
self.logger.info("Starting model training and forecasting process...")
daily_data = self.prepare_data()
for metric, data in daily_data.items():
self.logger.info(f"Processing metric: {metric}")
# Store the original unique_id and y columns
unique_id = data['unique_id'].copy()
y_values = data['y'].copy()
# Detect and add events
self.logger.debug(f"Detecting events for {metric}")
data = self.detect_events(data, include_price=True)
# Add back the required columns
data['unique_id'] = unique_id
data['y'] = y_values
# Create future dates for forecasting
future_dates = pd.date_range(
start=data['ds'].max() + pd.Timedelta(days=1),
periods=self.forecast_horizon,
freq='D'
)
# Create future features DataFrame
future_df = pd.DataFrame({'ds': future_dates})
future_df = self.detect_events(future_df, include_price=True)
# Add unique_id to future_df (required for StatsForecast)
future_df['unique_id'] = unique_id.iloc[0] # Use the same ID as training data
# Ensure columns match between training and future data
required_cols = ['ds', 'month', 'weekday', 'day', 'is_black_friday',
'is_holiday_season', 'total_price', 'unique_id']
if 'y' in data.columns:
required_cols.append('y')
data = data[required_cols] # Keep only necessary columns for training
future_df = future_df[required_cols[:-1]] # Don't need 'y' in future data
# Create and train model ensemble
self.logger.info(f"Creating model ensemble for {metric}")
model = self.create_model_ensemble(metric)
# Split data for evaluation
train_data = data[data['ds'] <= '2023-06-22']
test_data = data[data['ds'] > '2023-06-22']
self.logger.debug(f"Training data size: {len(train_data)}, Test data size: {len(test_data)}")
# Fit model and generate forecasts
self.logger.info(f"Training models for {metric}")
model.fit(train_data)
self.logger.info(f"Generating forecasts for {metric}")
forecast = model.forecast(h=self.forecast_horizon, X_df=future_df)
# Store results
self.models[metric] = model
self.forecasts[metric] = forecast
# Calculate metrics if test data is available
if not test_data.empty:
self.logger.info(f"Calculating performance metrics for {metric}")
y_true = test_data['y'].values
y_pred = forecast['mean'].values[:len(test_data)]
self.metrics[metric] = {
'mae': mae(y_true, y_pred),
'mape': mape(y_true, y_pred),
'rmse': rmse(y_true, y_pred),
'mase': mase(y_true, y_pred, seasonality=7)
}
self.logger.info(f"Metrics for {metric}: MAE={self.metrics[metric]['mae']:.2f}, "
f"MAPE={self.metrics[metric]['mape']:.2f}%, "
f"RMSE={self.metrics[metric]['rmse']:.2f}")
self.logger.info("Model training and forecasting completed successfully")
def get_best_model(self, metric: str) -> Tuple[str, float]:
"""
Identify the best performing model for each metric
"""
metric_forecasts = self.forecasts[metric]
metric_actuals = self.prepare_data()[metric]
model_performances = {}
for model in self.models[metric].models:
model_name = model.__class__.__name__
predictions = metric_forecasts[f'{model_name}_forecast'].values
actuals = metric_actuals['y'].values[-len(predictions):]
# Calculate MAPE
mape = np.mean(np.abs((actuals - predictions) / actuals)) * 100
model_performances[model_name] = mape
best_model = min(model_performances.items(), key=lambda x: x[1])
return best_model
def save_forecaster(self, filepath: str) -> None:
"""
Save the forecaster instance using joblib
"""
self.logger.info(f"Saving forecaster to {filepath}")
dump(self, filepath)
@classmethod
def load_forecaster(cls, filepath: str) -> 'ModernEcomForecaster':
"""
Load a saved forecaster instance using joblib
"""
return load(filepath)
def main():
# Load data
orders_df = pd.read_csv('formatted/orders.csv')
line_items_df = pd.read_csv('formatted/line_items.csv')
products_df = pd.read_csv('formatted/products.csv')
# Initialize forecaster with aligned seasonal lengths
forecaster = ModernEcomForecaster(
orders_df=orders_df,
line_items_df=line_items_df,
products_df=products_df,
forecast_horizon=365,
season_lengths=[7, 30, 365] # Weekly, monthly, and yearly seasonality
)
# Train models and generate forecasts
forecaster.train_and_forecast()
# Get best models for each metric
best_models = {
metric: forecaster.get_best_model(metric)
for metric in forecaster.forecasts.keys()
}
# Save the forecaster and best models
forecaster.save_forecaster('forecaster.pkl')
dump(best_models, 'best_models.pkl')
return forecaster, best_models
if __name__ == "__main__":
forecaster, best_models = main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment