Skip to content

Instantly share code, notes, and snippets.

@pdxjohnny
Last active February 1, 2021 17:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pdxjohnny/b535ece79d1f1c4fd9666ec6be0312b1 to your computer and use it in GitHub Desktop.
Save pdxjohnny/b535ece79d1f1c4fd9666ec6be0312b1 to your computer and use it in GitHub Desktop.
import pathlib
from typing import AsyncIterator, Type
from sktime.datasets import load_airline
from sktime.forecasting.base import ForecastingHorizon
from sktime.forecasting.exp_smoothing import ExponentialSmoothing
from sktime.performance_metrics.forecasting import sMAPE, smape_loss
import pandas as pd
import numpy as np
from dffml import (
Accuracy,
Feature,
Features,
ModelNotTrained,
Record,
SimpleModel,
SourcesContext,
config,
entrypoint,
field,
)
@config
class ExpModelConfig:
features: Features = field("Features to train on")
predict: Feature = field("Label or the value to be predicted")
directory: pathlib.Path = field("Directory where state should be saved")
spd: int = field("Seasonal Periodicity", default=12)
fh: int = field("Forecasting horizon", default=spd)
@entrypoint("expsmoothing")
class ExpModel(SimpleModel):
# The configuration class needs to be set as the CONFIG property
CONFIG: Type = ExpModelConfig
async def train(self, sources: SourcesContext) -> None:
# X and Y data
X = []
# Go through all records that have the feature we're training on and the
# feature we want to predict.
async for record in sources.with_features([self.parent.config.features[0].name]):
X.append(record.feature(self.parent.config.features[0].name))
# Fit demands that data be univariant
X = pd.Series(X)
spd = self.parent.config.spd
# Use self.logger to report how many records are being used for training
self.logger.debug("Number of training records: %d", len(X))
self.forecaster = ExponentialSmoothing(
trend="add", seasonal="multiplicative", sp=12
)
self.forecaster.fit(X)
# Save the trained model
joblib.dump(self.forecaster, str(self.forecaster_filepath))
async def accuracy(self, sources: SourcesContext) -> Accuracy:
"""
Evaluates the accuracy of the model by gathering predictions of the test data
and comparing them to the provided results.
We will use the sMAPE (symmetric mean absolute percentage error) to quantify the accuracy of our forecasts.
A lower sMAPE means higher accuracy.
"""
if not self.forecaster:
raise ModelNotTrained("Train the model before assessing accuracy")
# Get data
# input_data = await self.get_input_data(sources)
X_test = []
# Make predictions
async for record in sources.with_features(self.features):
record_data = []
for feature in record.features(self.features).values():
record_data.extend([feature] if np.isscalar(feature) else feature)
X_test.append(record_data)
X_test = pd.DataFrame(X_test)
foh = ForecastingHorizon(X_test.index, is_relative=False)
y_pred = self.forecaster.predict(foh)
return smape_loss(X, y_pred)
async def predict(self, sources: SourcesContext) -> AsyncIterator[Record]:
"""
Uses saved model to make predictions for a forecast horizon
"""
if not self.forecaster:
raise ModelNotTrained("Train the model first before getting predictions")
fh = self.parent.config.fh
predictions = self.forecaster.predict(fh)
return predictions
from dffml import Feature, Features
from dffml.noasync import accuracy, predict, train
from foremodel import ExpModel
import numpy as np
import pandas as pd
from sktime.datasets import load_airline
from sktime.forecasting.base import ForecastingHorizon
from sktime.forecasting.model_selection import temporal_train_test_split
from sktime.performance_metrics.forecasting import sMAPE, smape_loss
y = load_airline()
y_new = y.to_frame()
y_new.columns = ["Passengers"]
df = y_new["Passengers"]
df = df.reset_index()
y_train, y_test = temporal_train_test_split(df, test_size=36)
foh = np.arange(len(y_test)) + 1
# Configure the model
model = ExpModel(
features=Features(Feature("Passengers", float, 1),),
predict=Feature("Passengers", float, 1),
directory="model",
spd=12,
fh=foh,
)
# Train the mode
train(model, y_train)
# Assess accuracy
print("", accuracy(model, y_test))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment