Created
June 27, 2021 14:23
-
-
Save anna-geller/82f5aa4f45745db30ab9a53a8282b8f8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import boto3 | |
import logging | |
import pandas as pd | |
from great_expectations import from_pandas | |
from great_expectations.dataset.pandas_dataset import PandasDataset | |
from great_expectations.core.expectation_validation_result import ( | |
ExpectationValidationResult, | |
) | |
class TimeseriesDataTestRunner: | |
def __init__( | |
self, | |
aws_region: str = "eu-central-1", | |
sns_topic_arn: str = "arn:aws:sns:eu-central-1:338306982838:ge_timeseries_data_test", | |
dt_column: str = "timestamp", | |
dt_column_dtype: str = "datetime64[ns]", | |
nr_column: str = "value", | |
nr_column_dtype: str = "int64", | |
min_value: int = 0, | |
max_value: int = 100, | |
min_total_row_count: int = 672, | |
max_total_row_count: int = 744, | |
): | |
self.aws_region = aws_region | |
self.sns_topic_arn = sns_topic_arn | |
self.dt_column = dt_column | |
self.dt_column_dtype = dt_column_dtype | |
self.nr_column = nr_column | |
self.nr_column_dtype = nr_column_dtype | |
self.min_value = min_value | |
self.max_value = max_value | |
self.min_total_row_count = min_total_row_count | |
self.max_total_row_count = max_total_row_count | |
self.logger = logging.getLogger() | |
self.logger.setLevel(logging.INFO) | |
def send_email_alert(self, message: str) -> None: | |
sns = boto3.client("sns", region_name=self.aws_region) | |
sns_result = sns.publish(TopicArn=self.sns_topic_arn, Message=message) | |
self.logger.info("SNS email sent. Result: %s", sns_result) | |
def parse_data_test_result( | |
self, | |
validation_result: ExpectationValidationResult, | |
dataset_name: str = "Timeseries demo", | |
) -> None: | |
test_result = validation_result.to_json_dict() | |
if test_result["success"]: | |
self.logger.info("Data test passed for dataset %s.", dataset_name) | |
else: | |
alert = ( | |
f"Data test failed for dataset: {dataset_name}." | |
f" VALIDATION RESULT: {json.dumps(test_result)}" | |
) | |
self.logger.error(alert) | |
self.send_email_alert(alert) | |
def test_order_of_columns( | |
self, ge_df: PandasDataset | |
) -> ExpectationValidationResult: | |
return ge_df.expect_table_columns_to_match_ordered_list( | |
column_list=[self.dt_column, self.nr_column] | |
) | |
def test_row_count(self, ge_df: PandasDataset) -> ExpectationValidationResult: | |
"""Hourly time series in monthly batches: | |
31 days * 24 h = 744 rows, | |
28 days * 24 h = 672 rows""" | |
return ge_df.expect_table_row_count_to_be_between( | |
max_value=self.max_total_row_count, min_value=self.min_total_row_count | |
) | |
def test_null_values_nr_column( | |
self, ge_df: PandasDataset | |
) -> ExpectationValidationResult: | |
return ge_df.expect_column_values_to_not_be_null(column=self.nr_column) | |
def test_null_values_dt_column( | |
self, ge_df: PandasDataset | |
) -> ExpectationValidationResult: | |
return ge_df.expect_column_values_to_not_be_null(column=self.dt_column) | |
def test_data_type_nr_column( | |
self, ge_df: PandasDataset | |
) -> ExpectationValidationResult: | |
return ge_df.expect_column_values_to_be_of_type( | |
column=self.nr_column, type_=self.nr_column_dtype | |
) | |
def test_data_type_dt_column( | |
self, ge_df: PandasDataset | |
) -> ExpectationValidationResult: | |
return ge_df.expect_column_values_to_be_of_type( | |
column=self.dt_column, type_=self.dt_column_dtype | |
) | |
def test_value_range(self, ge_df: PandasDataset) -> ExpectationValidationResult: | |
return ge_df.expect_column_values_to_be_between( | |
column=self.nr_column, min_value=self.min_value, max_value=self.max_value | |
) | |
def run_data_tests(self, dataframe: pd.DataFrame,) -> None: | |
ge_df = from_pandas(dataframe) | |
tests_to_run = [ | |
self.test_order_of_columns(ge_df), | |
self.test_row_count(ge_df), | |
self.test_null_values_nr_column(ge_df), | |
self.test_null_values_dt_column(ge_df), | |
self.test_data_type_nr_column(ge_df), | |
self.test_data_type_dt_column(ge_df), | |
self.test_value_range(ge_df), | |
] | |
for data_test in tests_to_run: | |
self.parse_data_test_result(data_test) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment