Created
February 27, 2022 18:39
-
-
Save guidoturtu/d909e7d83ab797db4a5aeeb78fd8cbc1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding=utf8 | |
""" | |
A DAG that demonstrates use of the operators in this provider package. | |
""" | |
import os | |
from datetime import datetime, timedelta | |
from pathlib import Path | |
from typing import Any, Dict, List, MutableMapping, Optional, Set, Union | |
import logging | |
from airflow import DAG | |
from airflow.operators.dummy import DummyOperator | |
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator | |
from great_expectations.data_context.types.base import ( | |
CheckpointConfig, | |
) | |
GE_SLACK_WEBHOOK = os.getenv("AIRFLOW_VAR_GREATEXPECTATION_SLACK_WEBHOOK") | |
log = logging.getLogger(__name__) | |
# DATA CONTEXT - DATASOURCE CONFIGS | |
# "redshift": { | |
# "execution_engine": { | |
# "class_name": "SqlAlchemyExecutionEngine", | |
# "credentials": { | |
# "drivername": drivername, | |
# "host": host, | |
# "port": port, | |
# "username": username, | |
# "password": password, | |
# "database": database, | |
# "schema_name": "public", | |
# "query": { | |
# "sslmode": "prefer" | |
# } | |
# }, | |
# "module_name": "great_expectations.execution_engine" | |
# }, | |
# "data_connectors": { | |
# "default_runtime_data_connector_name": { | |
# "module_name": "great_expectations.datasource.data_connector", | |
# "class_name": "RuntimeDataConnector", | |
# "batch_identifiers": [ | |
# "default_identifier_name" | |
# ] | |
# }, | |
# "default_inferred_data_connector_name": { | |
# "module_name": "great_expectations.datasource.data_connector", | |
# "class_name": "InferredAssetSqlDataConnector" | |
# } | |
# }, | |
# "class_name": "Datasource", | |
# "module_name": "great_expectations.datasource" | |
# }, | |
# "athena": { | |
# "execution_engine": { | |
# "module_name": "great_expectations.execution_engine", | |
# "class_name": "SqlAlchemyExecutionEngine", | |
# "connection_string": f"awsathena+rest://@athena.{REGION}.amazonaws.com:443/?s3_staging_dir=s3://{S3_ATHENA_BUCKET_NAME}/great_expectations/" | |
# }, | |
# "data_connectors": { | |
# "default_runtime_data_connector_name": { | |
# "class_name": "RuntimeDataConnector", | |
# "batch_identifiers": [ | |
# "default_identifier_name" | |
# ], | |
# "module_name": "great_expectations.datasource.data_connector" | |
# }, | |
# "default_inferred_data_connector_name": { | |
# "class_name": "InferredAssetSqlDataConnector", | |
# "module_name": "great_expectations.datasource.data_connector" | |
# } | |
# }, | |
# "module_name": "great_expectations.datasource", | |
# "class_name": "Datasource", | |
# }, | |
# }, | |
def get_batch_request( | |
datasource_name: str, data_asset_name: str, query: str | |
) -> Dict[str, Any]: | |
return dict( | |
datasource_name=datasource_name, | |
data_connector_name="default_runtime_data_connector_name", | |
data_asset_name=data_asset_name, | |
runtime_parameters={"query": query}, | |
batch_identifiers={"default_identifier_name": "default_identifier"}, | |
) | |
def get_checkpoint_config( | |
checkpoint_name: str, | |
expectation_suite_name: str, | |
notify_on: str = "all", | |
notify_with: str = "all" | |
) -> CheckpointConfig: | |
checkpoint_config = dict( | |
name=checkpoint_name, | |
config_version=1, | |
run_name_template=f"{checkpoint_name}-{datetime.now().isoformat()}", | |
module_name="great_expectations.checkpoint", | |
class_name="SimpleCheckpoint", | |
expectation_suite_name=expectation_suite_name, | |
slack_webhook=GE_SLACK_WEBHOOK, | |
notify_on=notify_on, | |
notify_with=notify_with, | |
) | |
return CheckpointConfig(**checkpoint_config) | |
default_args = { | |
"owner": "Guido", | |
"depends_on_past": False, | |
"retry_delay": timedelta(minutes=1), | |
"retries": 0, | |
} | |
with DAG( | |
dag_id="poc_greatexpectations_dag", | |
default_args=default_args, | |
start_date=datetime(2021, 12, 15), | |
catchup=False, | |
schedule_interval=None | |
) as dag: | |
data_context_config= None # DATA CONTEXT CONFIGURATION DELETED FOR CONFIDENCIALITY | |
# CONTAIN ALL stores, datasources, configurations as a complete `great_expectations.yaml` | |
# REFERENCED datasources configuration above | |
# Redshift | |
checkpoint_kwargs = { | |
"run_name": "Airflow: {{ run_id }}-{{ task_instance_key_str }}", | |
"batch_request": get_batch_request( | |
datasource_name="redshift", | |
data_asset_name="db.table", | |
query=""" | |
select * | |
from db.table | |
limit 100 | |
""" | |
) | |
} | |
checkpoint_config = get_checkpoint_config( | |
checkpoint_name="some checkpoint name redshift", | |
expectation_suite_name="redshift.db.table.warning", | |
notify_on="all", | |
notify_with=["local_site"], # TODO:review, not working | |
) | |
ge_redshift_validation_example = GreatExpectationsOperator( | |
task_id="ge_redshift_validation_example", | |
data_context_config=data_context_config, | |
checkpoint_config=checkpoint_config, | |
checkpoint_kwargs=checkpoint_kwargs, | |
fail_task_on_validation_failure=False, | |
validation_failure_callback=(lambda x: print("Callback successfully run", x)), | |
) | |
# Athena test | |
checkpoint_kwargs = { | |
"run_name": "Airflow: {{ run_id }}-{{ task_instance_key_str }}", | |
"batch_request": get_batch_request( | |
datasource_name="athena", | |
data_asset_name="db.table", | |
query=""" | |
select * | |
from db.table | |
limit 100 | |
""" | |
) | |
} | |
checkpoint_config = get_checkpoint_config( | |
checkpoint_name="some checkpoint name athena", | |
expectation_suite_name="athena.db.table.warning", | |
notify_on="all", | |
notify_with=["local_site"], # TODO:review, not working | |
) | |
ge_athena_validation_example = GreatExpectationsOperator( | |
task_id="ge_athena_validation_example", | |
data_context_config=data_context_config, | |
checkpoint_config=checkpoint_config, | |
checkpoint_kwargs=checkpoint_kwargs, | |
fail_task_on_validation_failure=False, | |
validation_failure_callback=(lambda x: print("Callback successfully run", x)), | |
) | |
start = DummyOperator(task_id="start") | |
end = DummyOperator(task_id="end") | |
( | |
start | |
>> ge_redshift_validation_example | |
>> ge_athena_validation_example | |
>> end | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment