Skip to content

Instantly share code, notes, and snippets.

@elutins
Last active April 12, 2024 18:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elutins/c114e3c6fed946dd7a5b1b6269dc4efd to your computer and use it in GitHub Desktop.
Save elutins/c114e3c6fed946dd7a5b1b6269dc4efd to your computer and use it in GitHub Desktop.
Demo metaflow for running drift detection
"""Simple flow to demonstrate how to run feature drift detection with Evidently on some dummy data."""
from __future__ import annotations
import os
import numpy as np
import pandas as pd
from metaflow import FlowSpec, batch, card, step # pylint: disable=no-name-in-module
# BaseDriftParameters found in gist: https://gist.github.com/elutins/14db8768fef4ebd68c197d41b74f93f4
from tiers_utils.base_drift_parameters import BaseDriftParameters
class DemoDriftFlow(FlowSpec):
"""A demo drift flow to use for demo purposes."""
@step
def start(self):
"""Creates a reference and current dataset of 5 rows each for demonstration purposes."""
# create dummy data for reference dataset. each column will have 5 values
ref_nulls = [None] * 2 + [1] * 3
ref_categoricals = ["a", "b", "c", "d", "E"]
ref_rates = np.random.default_rng().random(size=5)
self.df_reference = pd.DataFrame(
{
"null_field": ref_nulls,
"categorical_field": ref_categoricals,
"numeric_field": ref_rates,
}
)
# create dummy data for current dataset. again each column will have 5 values
# the 'null_field' test will fail the tests because the threshold is set to 20% in BaseDriftParameters, but
# there are 60% nulls in our dummy data
# the 'categorical_field' TestShareOfOutListValues will fail bc we added 'F' to the current dataset which
# was not seen in the reference; so 1/5 (20%) of the values are new.
current_nulls = [None] * 3 + [1] * 2
current_categoricals = ["a", "b", "c", "d", "F"]
# purposefully setting these rates to be between 1-10 so TestMeanInNSigmas will fail
current_rates = np.random.default_rng().uniform(1, 10, size=4)
current_rates = np.append(current_rates, None)
self.df_current = pd.DataFrame(
{
"null_field": current_nulls,
"categorical_field": current_categoricals,
"numeric_field": current_rates,
}
)
###############################################################
# showing how to customize with an added test to the 'numeric_field'. All custom tests must be in the format:
# {column_name: {test: evidently-test-name, parameters: {param1: value1}}, column_name2: ... }
###############################################################
from evidently.tests import TestValueRange
CUSTOM_COLUMN_TESTS = {"numeric_field": {"test": TestValueRange, "parameters": {"left": 0, "right": 1.0}}}
###############################################################
# showing how to customize threshold of failure for certain tests and columns
###############################################################
CUSTOM_NULL_THRESHOLD_MAPPING: dict[str, float] = {"null_field": 0.2}
CUSTOM_N_SIGMAS_MAPPING: dict[str, float] = {"numeric_field": 1}
###############################################################
# INSTANTIATING DRIFT OBJECT + SETTING MANDATORY VARIABLES.
###############################################################
drift_config = {
# mandatory variables
"high_risk_features": ["null_field", "categorical_field"],
"low_risk_features": ["numeric_field"],
"categorical_cols": ["categorical_field"],
"numeric_cols": ["numeric_field", "null_field"],
"text_cols": [],
# optional variables
"custom_column_tests": CUSTOM_COLUMN_TESTS,
"slack_channels": ["@evan.lutins"],
"slack_notify": ["@evan.lutins"],
"custom_n_sigmas_mapping": CUSTOM_N_SIGMAS_MAPPING,
"custom_missing_vals_mapping": CUSTOM_NULL_THRESHOLD_MAPPING,
}
self.demo_drift = BaseDriftParameters(**drift_config)
self.next(self.profile_summary)
@card(type="html", id="profile_summary", options={"attribute": "drift_suite_html"})
@step
def profile_summary(self):
"""Runs tests to determine if drift is detected for any feature, provides viz with summary statisists too.
Runs a suite of Evidently provided tests, based on the dtype for all specified features. Also builds a report
highlighting the tests that were run, pass or failure, and high level summary statistics for each column. The
report depends on the tests that are specified as well as the risk level of each feature. If using the default
tests from BaseDriftParamters class the following tests will get run for the respective dtypes:
- numeric fields: TestMeanInNSigmas, TestColumnShareOfMissingValues
- categorical fields: TestShareOfOutListValues, TestColumnDrift
- response fields: TestColumnDrift
A full list of tests offered by Evidently can be found here: https://docs.evidentlyai.com/reference/all-tests
"""
from evidently import ColumnMapping
drift_suite = self.demo_drift.build_test_suite()
print("building column mapping")
cmap = ColumnMapping(
numerical_features=self.demo_drift.high_risk_num_features + self.demo_drift.low_risk_num_features,
categorical_features=self.demo_drift.high_risk_cat_features
+ self.demo_drift.low_risk_cat_features
+ self.demo_drift.response_fields,
)
print("running drift suite")
drift_suite.run(reference_data=self.df_reference, current_data=self.df_current, column_mapping=cmap)
print("running drift suite complete")
self.tests = drift_suite.as_dict()["tests"]
# producing html card for metaflow UI
self.drift_suite_html = drift_suite.get_html()
self.next(self.drift_report)
@card(type="html", id="drift_report", options={"attribute": "drift_report_html"})
@step
def drift_report(self):
"""Generate report with additional visualizations for features in which drift was detected.
The report will only show viz for features that failed any of test run in the previous step. The viz
generated in Evidentlys Report object contains more detail than the viz from the previous step, allowing the
user to understand why drift/anomalies were detected.
This step will also send slack alerts to the predetermined channels if drift was detected.
"""
from evidently import ColumnMapping
alert_tests = [test for test in self.tests if test["status"] == "FAIL"]
n_failed_tests = len(alert_tests)
if n_failed_tests > 0:
# collecting unique features that failed in the above TestSuite to pass to the DriftReport in next step
alert_feature_types, unique_alert_features = self.demo_drift.extract_failed_features(alert_tests)
# converting numeric fields to float16 for evidently report
self.df_current[alert_feature_types["numeric"]] = self.df_current[alert_feature_types["numeric"]].astype(
np.float16
)
self.df_reference[alert_feature_types["numeric"]] = self.df_reference[
alert_feature_types["numeric"]
].astype(np.float16)
drift_report = self.demo_drift.generate_drift_report(
categorical_features=alert_feature_types["categorical"], # type: ignore
numeric_features=alert_feature_types["numeric"], # type: ignore
text_features=alert_feature_types["text"], # type: ignore
)
cmap = ColumnMapping(
categorical_features=alert_feature_types["categorical"], # type: ignore
numerical_features=alert_feature_types["numeric"], # type: ignore
text_features=alert_feature_types["text"], # type: ignore
)
drift_report.run(
reference_data=self.df_reference[unique_alert_features],
current_data=self.df_current[unique_alert_features],
column_mapping=cmap,
)
self.drift_report_html = drift_report.get_html()
## NOTE: this is where you could insert a notification service to alert relevant users that drift has occurred
## eg: Realtor.com uses slack sdk to send slack alerts to dedicated drift channels
print(
f"Drift detected for a total of {n_failed_tests} features in Live LeadPool inference data based on "
"the predetermined constraints.\nClick on the link above to view which specific constraints failed"
)
else:
print("There was zero drift, data errors, or anomalies detected across any feature in the dataset :)")
self.next(self.end)
@enable_decorator(batch(cpu=1, memory=1_000, queue="memory", image=EVIDENTLY_IMAGE), BATCH_ENABLE)
@step
def end(self):
print("flow is finished")
if __name__ == "__main__":
DemoDriftFlow()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment