Skip to content

Instantly share code, notes, and snippets.

@santiviquez
Created May 16, 2024 17:29
Show Gist options
  • Select an option

  • Save santiviquez/aa224c6e232c8bd2534893888981564d to your computer and use it in GitHub Desktop.

Select an option

Save santiviquez/aa224c6e232c8bd2534893888981564d to your computer and use it in GitHub Desktop.
import numpy as np
import nannyml as nml
import pandas as pd
class DriftSignalsModel:
def __init__(
self,
y_pred_proba,
y_pred,
y_true,
features,
treat_as_cat,
metric,
regressor,
cat_methods=['jensen_shannon'],
cont_methods=['jensen_shannon'],
method_type='both',
problem_type='classification_binary',
chunk_size=2000,
):
self.y_pred_proba = y_pred_proba
self.y_pred = y_pred
self.y_true = y_true
self.features = features
self.treat_as_cat = treat_as_cat
self.metric = metric
self.regressor = regressor
self.cat_methods = cat_methods
self.cont_methods = cont_methods
self.method_type = method_type
self.problem_type = problem_type
self.chunk_size = chunk_size
def fit_calculators(self, reference):
drift_signals = []
if self.method_type == 'univariate' or self.method_type == 'both':
# Univariate drift
self.uni_calc = nml.UnivariateDriftCalculator(
column_names=self.features,
continuous_methods=self.cont_methods,
categorical_methods=self.cat_methods,
treat_as_categorical=self.treat_as_cat,
chunk_size=self.chunk_size
)
self.uni_calc.fit(reference)
uni_drift_signals_df = self.uni_calc.result.to_df(multilevel=False)
continuous_columns = list(set(self.features).difference(self.treat_as_cat))
continuous_uni_drift_columns = [column_name + f'_{distance_metric}_value' for column_name in continuous_columns for distance_metric in self.cont_methods]
categorical_uni_drift_columns = [column_name + f'_{distance_metric}_value' for column_name in self.treat_as_cat for distance_metric in self.cat_methods]
uni_drift_columns = continuous_uni_drift_columns + categorical_uni_drift_columns
if self.method_type == 'univariate':
uni_drift_columns.append('chunk_period')
drift_signals.append(uni_drift_signals_df[uni_drift_columns])
if self.method_type == 'multivariate' or self.method_type == 'both':
# Data reconstruction error
self.data_recostruction_calc = nml.DataReconstructionDriftCalculator(
column_names=self.features,
chunk_size=self.chunk_size)
self.data_recostruction_calc.fit(reference)
reconstruction_error_results_df = self.data_recostruction_calc.result.to_df(multilevel=False)
reconstruction_error_column = 'reconstruction_error_value'
drift_signals.append(reconstruction_error_results_df[reconstruction_error_column])
# Domain classifier
self.dm_calc = nml.DomainClassifierCalculator(
feature_column_names=self.features,
treat_as_categorical=self.treat_as_cat,
chunk_size=self.chunk_size
)
self.dm_calc.fit(reference)
domain_clf_results_df = self.dm_calc.result.to_df(multilevel=False)
domain_clf_column = 'domain_classifier_auroc_value'
drift_signals.append(domain_clf_results_df[[domain_clf_column, 'chunk_period']])
# Realized performance (this will be the y_true of the drift signal models)
self.performance_calc = nml.PerformanceCalculator(
y_pred_proba=self.y_pred_proba,
y_pred=self.y_pred,
y_true=self.y_true,
problem_type=self.problem_type,
metrics=self.metric,
chunk_size=self.chunk_size)
self.performance_calc.fit(reference)
performance_results_df = self.performance_calc.result.to_df(multilevel=False)
performance_column = self.metric + '_value'
drift_signals_df = pd.concat(drift_signals, axis=1)
drif_signals_reference_X = drift_signals_df[drift_signals_df['chunk_period'] == 'reference'].drop(columns='chunk_period')
drif_signals_reference_y = performance_results_df[performance_results_df['chunk_period'] == 'reference'][performance_column]
return drif_signals_reference_X, drif_signals_reference_y
def fit(self, df_reference):
self.reference = df_reference
drif_signals_reference_X, drif_signals_reference_y = self.fit_calculators(self.reference)
self.signals_model = self.regressor.fit(drif_signals_reference_X, drif_signals_reference_y)
def estimate(self, chunk):
drift_signals = []
if self.method_type == 'univariate' or self.method_type == 'both':
uni_drift_signals = self.uni_calc.calculate(chunk)
uni_drift_signals_df = uni_drift_signals.to_df(multilevel=False)
continuous_columns = list(set(self.features).difference(self.treat_as_cat))
continuous_uni_drift_columns = [column_name + f'_{distance_metric}_value' for column_name in continuous_columns for distance_metric in self.cont_methods]
categorical_uni_drift_columns = [column_name + f'_{distance_metric}_value' for column_name in self.treat_as_cat for distance_metric in self.cat_methods]
uni_drift_columns = continuous_uni_drift_columns + categorical_uni_drift_columns
if self.method_type == 'univariate':
uni_drift_columns.append('chunk_period')
drift_signals.append(uni_drift_signals_df[uni_drift_columns])
if self.method_type == 'multivariate' or self.method_type == 'both':
reconstruction_error_results = self.data_recostruction_calc.calculate(chunk)
reconstruction_error_results_df = reconstruction_error_results.to_df(multilevel=False)
reconstruction_error_column = 'reconstruction_error_value'
drift_signals.append(reconstruction_error_results_df[reconstruction_error_column])
domain_clf_results = self.dm_calc.calculate(chunk)
domain_clf_results_df = domain_clf_results.to_df(multilevel=False)
domain_clf_column = 'domain_classifier_auroc_value'
drift_signals.append(domain_clf_results_df[[domain_clf_column, 'chunk_period']])
drift_signals_df = pd.concat(drift_signals, axis=1)
drif_signals_chunk_X = drift_signals_df[drift_signals_df['chunk_period'] == 'analysis'].drop(columns='chunk_period')
estimated_performance = self.signals_model.predict(drif_signals_chunk_X)
return estimated_performance
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment