Created
May 16, 2024 17:29
-
-
Save santiviquez/aa224c6e232c8bd2534893888981564d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| import nannyml as nml | |
| import pandas as pd | |
| class DriftSignalsModel: | |
| def __init__( | |
| self, | |
| y_pred_proba, | |
| y_pred, | |
| y_true, | |
| features, | |
| treat_as_cat, | |
| metric, | |
| regressor, | |
| cat_methods=['jensen_shannon'], | |
| cont_methods=['jensen_shannon'], | |
| method_type='both', | |
| problem_type='classification_binary', | |
| chunk_size=2000, | |
| ): | |
| self.y_pred_proba = y_pred_proba | |
| self.y_pred = y_pred | |
| self.y_true = y_true | |
| self.features = features | |
| self.treat_as_cat = treat_as_cat | |
| self.metric = metric | |
| self.regressor = regressor | |
| self.cat_methods = cat_methods | |
| self.cont_methods = cont_methods | |
| self.method_type = method_type | |
| self.problem_type = problem_type | |
| self.chunk_size = chunk_size | |
| def fit_calculators(self, reference): | |
| drift_signals = [] | |
| if self.method_type == 'univariate' or self.method_type == 'both': | |
| # Univariate drift | |
| self.uni_calc = nml.UnivariateDriftCalculator( | |
| column_names=self.features, | |
| continuous_methods=self.cont_methods, | |
| categorical_methods=self.cat_methods, | |
| treat_as_categorical=self.treat_as_cat, | |
| chunk_size=self.chunk_size | |
| ) | |
| self.uni_calc.fit(reference) | |
| uni_drift_signals_df = self.uni_calc.result.to_df(multilevel=False) | |
| continuous_columns = list(set(self.features).difference(self.treat_as_cat)) | |
| continuous_uni_drift_columns = [column_name + f'_{distance_metric}_value' for column_name in continuous_columns for distance_metric in self.cont_methods] | |
| categorical_uni_drift_columns = [column_name + f'_{distance_metric}_value' for column_name in self.treat_as_cat for distance_metric in self.cat_methods] | |
| uni_drift_columns = continuous_uni_drift_columns + categorical_uni_drift_columns | |
| if self.method_type == 'univariate': | |
| uni_drift_columns.append('chunk_period') | |
| drift_signals.append(uni_drift_signals_df[uni_drift_columns]) | |
| if self.method_type == 'multivariate' or self.method_type == 'both': | |
| # Data reconstruction error | |
| self.data_recostruction_calc = nml.DataReconstructionDriftCalculator( | |
| column_names=self.features, | |
| chunk_size=self.chunk_size) | |
| self.data_recostruction_calc.fit(reference) | |
| reconstruction_error_results_df = self.data_recostruction_calc.result.to_df(multilevel=False) | |
| reconstruction_error_column = 'reconstruction_error_value' | |
| drift_signals.append(reconstruction_error_results_df[reconstruction_error_column]) | |
| # Domain classifier | |
| self.dm_calc = nml.DomainClassifierCalculator( | |
| feature_column_names=self.features, | |
| treat_as_categorical=self.treat_as_cat, | |
| chunk_size=self.chunk_size | |
| ) | |
| self.dm_calc.fit(reference) | |
| domain_clf_results_df = self.dm_calc.result.to_df(multilevel=False) | |
| domain_clf_column = 'domain_classifier_auroc_value' | |
| drift_signals.append(domain_clf_results_df[[domain_clf_column, 'chunk_period']]) | |
| # Realized performance (this will be the y_true of the drift signal models) | |
| self.performance_calc = nml.PerformanceCalculator( | |
| y_pred_proba=self.y_pred_proba, | |
| y_pred=self.y_pred, | |
| y_true=self.y_true, | |
| problem_type=self.problem_type, | |
| metrics=self.metric, | |
| chunk_size=self.chunk_size) | |
| self.performance_calc.fit(reference) | |
| performance_results_df = self.performance_calc.result.to_df(multilevel=False) | |
| performance_column = self.metric + '_value' | |
| drift_signals_df = pd.concat(drift_signals, axis=1) | |
| drif_signals_reference_X = drift_signals_df[drift_signals_df['chunk_period'] == 'reference'].drop(columns='chunk_period') | |
| drif_signals_reference_y = performance_results_df[performance_results_df['chunk_period'] == 'reference'][performance_column] | |
| return drif_signals_reference_X, drif_signals_reference_y | |
| def fit(self, df_reference): | |
| self.reference = df_reference | |
| drif_signals_reference_X, drif_signals_reference_y = self.fit_calculators(self.reference) | |
| self.signals_model = self.regressor.fit(drif_signals_reference_X, drif_signals_reference_y) | |
| def estimate(self, chunk): | |
| drift_signals = [] | |
| if self.method_type == 'univariate' or self.method_type == 'both': | |
| uni_drift_signals = self.uni_calc.calculate(chunk) | |
| uni_drift_signals_df = uni_drift_signals.to_df(multilevel=False) | |
| continuous_columns = list(set(self.features).difference(self.treat_as_cat)) | |
| continuous_uni_drift_columns = [column_name + f'_{distance_metric}_value' for column_name in continuous_columns for distance_metric in self.cont_methods] | |
| categorical_uni_drift_columns = [column_name + f'_{distance_metric}_value' for column_name in self.treat_as_cat for distance_metric in self.cat_methods] | |
| uni_drift_columns = continuous_uni_drift_columns + categorical_uni_drift_columns | |
| if self.method_type == 'univariate': | |
| uni_drift_columns.append('chunk_period') | |
| drift_signals.append(uni_drift_signals_df[uni_drift_columns]) | |
| if self.method_type == 'multivariate' or self.method_type == 'both': | |
| reconstruction_error_results = self.data_recostruction_calc.calculate(chunk) | |
| reconstruction_error_results_df = reconstruction_error_results.to_df(multilevel=False) | |
| reconstruction_error_column = 'reconstruction_error_value' | |
| drift_signals.append(reconstruction_error_results_df[reconstruction_error_column]) | |
| domain_clf_results = self.dm_calc.calculate(chunk) | |
| domain_clf_results_df = domain_clf_results.to_df(multilevel=False) | |
| domain_clf_column = 'domain_classifier_auroc_value' | |
| drift_signals.append(domain_clf_results_df[[domain_clf_column, 'chunk_period']]) | |
| drift_signals_df = pd.concat(drift_signals, axis=1) | |
| drif_signals_chunk_X = drift_signals_df[drift_signals_df['chunk_period'] == 'analysis'].drop(columns='chunk_period') | |
| estimated_performance = self.signals_model.predict(drif_signals_chunk_X) | |
| return estimated_performance |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment