Last active
April 12, 2024 18:16
-
-
Save elutins/c114e3c6fed946dd7a5b1b6269dc4efd to your computer and use it in GitHub Desktop.
Demo metaflow for running drift detection
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Simple flow to demonstrate how to run feature drift detection with Evidently on some dummy data.""" | |
from __future__ import annotations | |
import os | |
import numpy as np | |
import pandas as pd | |
from metaflow import FlowSpec, batch, card, step # pylint: disable=no-name-in-module | |
# BaseDriftParameters found in gist: https://gist.github.com/elutins/14db8768fef4ebd68c197d41b74f93f4 | |
from tiers_utils.base_drift_parameters import BaseDriftParameters | |
class DemoDriftFlow(FlowSpec): | |
"""A demo drift flow to use for demo purposes.""" | |
@step | |
def start(self): | |
"""Creates a reference and current dataset of 5 rows each for demonstration purposes.""" | |
# create dummy data for reference dataset. each column will have 5 values | |
ref_nulls = [None] * 2 + [1] * 3 | |
ref_categoricals = ["a", "b", "c", "d", "E"] | |
ref_rates = np.random.default_rng().random(size=5) | |
self.df_reference = pd.DataFrame( | |
{ | |
"null_field": ref_nulls, | |
"categorical_field": ref_categoricals, | |
"numeric_field": ref_rates, | |
} | |
) | |
# create dummy data for current dataset. again each column will have 5 values | |
# the 'null_field' test will fail the tests because the threshold is set to 20% in BaseDriftParameters, but | |
# there are 60% nulls in our dummy data | |
# the 'categorical_field' TestShareOfOutListValues will fail bc we added 'F' to the current dataset which | |
# was not seen in the reference; so 1/5 (20%) of the values are new. | |
current_nulls = [None] * 3 + [1] * 2 | |
current_categoricals = ["a", "b", "c", "d", "F"] | |
# purposefully setting these rates to be between 1-10 so TestMeanInNSigmas will fail | |
current_rates = np.random.default_rng().uniform(1, 10, size=4) | |
current_rates = np.append(current_rates, None) | |
self.df_current = pd.DataFrame( | |
{ | |
"null_field": current_nulls, | |
"categorical_field": current_categoricals, | |
"numeric_field": current_rates, | |
} | |
) | |
############################################################### | |
# showing how to customize with an added test to the 'numeric_field'. All custom tests must be in the format: | |
# {column_name: {test: evidently-test-name, parameters: {param1: value1}}, column_name2: ... } | |
############################################################### | |
from evidently.tests import TestValueRange | |
CUSTOM_COLUMN_TESTS = {"numeric_field": {"test": TestValueRange, "parameters": {"left": 0, "right": 1.0}}} | |
############################################################### | |
# showing how to customize threshold of failure for certain tests and columns | |
############################################################### | |
CUSTOM_NULL_THRESHOLD_MAPPING: dict[str, float] = {"null_field": 0.2} | |
CUSTOM_N_SIGMAS_MAPPING: dict[str, float] = {"numeric_field": 1} | |
############################################################### | |
# INSTANTIATING DRIFT OBJECT + SETTING MANDATORY VARIABLES. | |
############################################################### | |
drift_config = { | |
# mandatory variables | |
"high_risk_features": ["null_field", "categorical_field"], | |
"low_risk_features": ["numeric_field"], | |
"categorical_cols": ["categorical_field"], | |
"numeric_cols": ["numeric_field", "null_field"], | |
"text_cols": [], | |
# optional variables | |
"custom_column_tests": CUSTOM_COLUMN_TESTS, | |
"slack_channels": ["@evan.lutins"], | |
"slack_notify": ["@evan.lutins"], | |
"custom_n_sigmas_mapping": CUSTOM_N_SIGMAS_MAPPING, | |
"custom_missing_vals_mapping": CUSTOM_NULL_THRESHOLD_MAPPING, | |
} | |
self.demo_drift = BaseDriftParameters(**drift_config) | |
self.next(self.profile_summary) | |
@card(type="html", id="profile_summary", options={"attribute": "drift_suite_html"}) | |
@step | |
def profile_summary(self): | |
"""Runs tests to determine if drift is detected for any feature, provides viz with summary statisists too. | |
Runs a suite of Evidently provided tests, based on the dtype for all specified features. Also builds a report | |
highlighting the tests that were run, pass or failure, and high level summary statistics for each column. The | |
report depends on the tests that are specified as well as the risk level of each feature. If using the default | |
tests from BaseDriftParamters class the following tests will get run for the respective dtypes: | |
- numeric fields: TestMeanInNSigmas, TestColumnShareOfMissingValues | |
- categorical fields: TestShareOfOutListValues, TestColumnDrift | |
- response fields: TestColumnDrift | |
A full list of tests offered by Evidently can be found here: https://docs.evidentlyai.com/reference/all-tests | |
""" | |
from evidently import ColumnMapping | |
drift_suite = self.demo_drift.build_test_suite() | |
print("building column mapping") | |
cmap = ColumnMapping( | |
numerical_features=self.demo_drift.high_risk_num_features + self.demo_drift.low_risk_num_features, | |
categorical_features=self.demo_drift.high_risk_cat_features | |
+ self.demo_drift.low_risk_cat_features | |
+ self.demo_drift.response_fields, | |
) | |
print("running drift suite") | |
drift_suite.run(reference_data=self.df_reference, current_data=self.df_current, column_mapping=cmap) | |
print("running drift suite complete") | |
self.tests = drift_suite.as_dict()["tests"] | |
# producing html card for metaflow UI | |
self.drift_suite_html = drift_suite.get_html() | |
self.next(self.drift_report) | |
@card(type="html", id="drift_report", options={"attribute": "drift_report_html"}) | |
@step | |
def drift_report(self): | |
"""Generate report with additional visualizations for features in which drift was detected. | |
The report will only show viz for features that failed any of test run in the previous step. The viz | |
generated in Evidentlys Report object contains more detail than the viz from the previous step, allowing the | |
user to understand why drift/anomalies were detected. | |
This step will also send slack alerts to the predetermined channels if drift was detected. | |
""" | |
from evidently import ColumnMapping | |
alert_tests = [test for test in self.tests if test["status"] == "FAIL"] | |
n_failed_tests = len(alert_tests) | |
if n_failed_tests > 0: | |
# collecting unique features that failed in the above TestSuite to pass to the DriftReport in next step | |
alert_feature_types, unique_alert_features = self.demo_drift.extract_failed_features(alert_tests) | |
# converting numeric fields to float16 for evidently report | |
self.df_current[alert_feature_types["numeric"]] = self.df_current[alert_feature_types["numeric"]].astype( | |
np.float16 | |
) | |
self.df_reference[alert_feature_types["numeric"]] = self.df_reference[ | |
alert_feature_types["numeric"] | |
].astype(np.float16) | |
drift_report = self.demo_drift.generate_drift_report( | |
categorical_features=alert_feature_types["categorical"], # type: ignore | |
numeric_features=alert_feature_types["numeric"], # type: ignore | |
text_features=alert_feature_types["text"], # type: ignore | |
) | |
cmap = ColumnMapping( | |
categorical_features=alert_feature_types["categorical"], # type: ignore | |
numerical_features=alert_feature_types["numeric"], # type: ignore | |
text_features=alert_feature_types["text"], # type: ignore | |
) | |
drift_report.run( | |
reference_data=self.df_reference[unique_alert_features], | |
current_data=self.df_current[unique_alert_features], | |
column_mapping=cmap, | |
) | |
self.drift_report_html = drift_report.get_html() | |
## NOTE: this is where you could insert a notification service to alert relevant users that drift has occurred | |
## eg: Realtor.com uses slack sdk to send slack alerts to dedicated drift channels | |
print( | |
f"Drift detected for a total of {n_failed_tests} features in Live LeadPool inference data based on " | |
"the predetermined constraints.\nClick on the link above to view which specific constraints failed" | |
) | |
else: | |
print("There was zero drift, data errors, or anomalies detected across any feature in the dataset :)") | |
self.next(self.end) | |
@enable_decorator(batch(cpu=1, memory=1_000, queue="memory", image=EVIDENTLY_IMAGE), BATCH_ENABLE) | |
@step | |
def end(self): | |
print("flow is finished") | |
if __name__ == "__main__": | |
DemoDriftFlow() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment