Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created June 27, 2021 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/82f5aa4f45745db30ab9a53a8282b8f8 to your computer and use it in GitHub Desktop.
Save anna-geller/82f5aa4f45745db30ab9a53a8282b8f8 to your computer and use it in GitHub Desktop.
import json
import boto3
import logging
import pandas as pd
from great_expectations import from_pandas
from great_expectations.dataset.pandas_dataset import PandasDataset
from great_expectations.core.expectation_validation_result import (
ExpectationValidationResult,
)
class TimeseriesDataTestRunner:
def __init__(
self,
aws_region: str = "eu-central-1",
sns_topic_arn: str = "arn:aws:sns:eu-central-1:338306982838:ge_timeseries_data_test",
dt_column: str = "timestamp",
dt_column_dtype: str = "datetime64[ns]",
nr_column: str = "value",
nr_column_dtype: str = "int64",
min_value: int = 0,
max_value: int = 100,
min_total_row_count: int = 672,
max_total_row_count: int = 744,
):
self.aws_region = aws_region
self.sns_topic_arn = sns_topic_arn
self.dt_column = dt_column
self.dt_column_dtype = dt_column_dtype
self.nr_column = nr_column
self.nr_column_dtype = nr_column_dtype
self.min_value = min_value
self.max_value = max_value
self.min_total_row_count = min_total_row_count
self.max_total_row_count = max_total_row_count
self.logger = logging.getLogger()
self.logger.setLevel(logging.INFO)
def send_email_alert(self, message: str) -> None:
sns = boto3.client("sns", region_name=self.aws_region)
sns_result = sns.publish(TopicArn=self.sns_topic_arn, Message=message)
self.logger.info("SNS email sent. Result: %s", sns_result)
def parse_data_test_result(
self,
validation_result: ExpectationValidationResult,
dataset_name: str = "Timeseries demo",
) -> None:
test_result = validation_result.to_json_dict()
if test_result["success"]:
self.logger.info("Data test passed for dataset %s.", dataset_name)
else:
alert = (
f"Data test failed for dataset: {dataset_name}."
f" VALIDATION RESULT: {json.dumps(test_result)}"
)
self.logger.error(alert)
self.send_email_alert(alert)
def test_order_of_columns(
self, ge_df: PandasDataset
) -> ExpectationValidationResult:
return ge_df.expect_table_columns_to_match_ordered_list(
column_list=[self.dt_column, self.nr_column]
)
def test_row_count(self, ge_df: PandasDataset) -> ExpectationValidationResult:
"""Hourly time series in monthly batches:
31 days * 24 h = 744 rows,
28 days * 24 h = 672 rows"""
return ge_df.expect_table_row_count_to_be_between(
max_value=self.max_total_row_count, min_value=self.min_total_row_count
)
def test_null_values_nr_column(
self, ge_df: PandasDataset
) -> ExpectationValidationResult:
return ge_df.expect_column_values_to_not_be_null(column=self.nr_column)
def test_null_values_dt_column(
self, ge_df: PandasDataset
) -> ExpectationValidationResult:
return ge_df.expect_column_values_to_not_be_null(column=self.dt_column)
def test_data_type_nr_column(
self, ge_df: PandasDataset
) -> ExpectationValidationResult:
return ge_df.expect_column_values_to_be_of_type(
column=self.nr_column, type_=self.nr_column_dtype
)
def test_data_type_dt_column(
self, ge_df: PandasDataset
) -> ExpectationValidationResult:
return ge_df.expect_column_values_to_be_of_type(
column=self.dt_column, type_=self.dt_column_dtype
)
def test_value_range(self, ge_df: PandasDataset) -> ExpectationValidationResult:
return ge_df.expect_column_values_to_be_between(
column=self.nr_column, min_value=self.min_value, max_value=self.max_value
)
def run_data_tests(self, dataframe: pd.DataFrame,) -> None:
ge_df = from_pandas(dataframe)
tests_to_run = [
self.test_order_of_columns(ge_df),
self.test_row_count(ge_df),
self.test_null_values_nr_column(ge_df),
self.test_null_values_dt_column(ge_df),
self.test_data_type_nr_column(ge_df),
self.test_data_type_dt_column(ge_df),
self.test_value_range(ge_df),
]
for data_test in tests_to_run:
self.parse_data_test_result(data_test)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment