Skip to content

Instantly share code, notes, and snippets.

@guidoturtu
Created February 27, 2022 18:39
Show Gist options
  • Save guidoturtu/d909e7d83ab797db4a5aeeb78fd8cbc1 to your computer and use it in GitHub Desktop.
Save guidoturtu/d909e7d83ab797db4a5aeeb78fd8cbc1 to your computer and use it in GitHub Desktop.
# encoding=utf8
"""
A DAG that demonstrates use of the operators in this provider package.
"""
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Dict, List, MutableMapping, Optional, Set, Union
import logging
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
from great_expectations.data_context.types.base import (
CheckpointConfig,
)
GE_SLACK_WEBHOOK = os.getenv("AIRFLOW_VAR_GREATEXPECTATION_SLACK_WEBHOOK")
log = logging.getLogger(__name__)
# DATA CONTEXT - DATASOURCE CONFIGS
# "redshift": {
# "execution_engine": {
# "class_name": "SqlAlchemyExecutionEngine",
# "credentials": {
# "drivername": drivername,
# "host": host,
# "port": port,
# "username": username,
# "password": password,
# "database": database,
# "schema_name": "public",
# "query": {
# "sslmode": "prefer"
# }
# },
# "module_name": "great_expectations.execution_engine"
# },
# "data_connectors": {
# "default_runtime_data_connector_name": {
# "module_name": "great_expectations.datasource.data_connector",
# "class_name": "RuntimeDataConnector",
# "batch_identifiers": [
# "default_identifier_name"
# ]
# },
# "default_inferred_data_connector_name": {
# "module_name": "great_expectations.datasource.data_connector",
# "class_name": "InferredAssetSqlDataConnector"
# }
# },
# "class_name": "Datasource",
# "module_name": "great_expectations.datasource"
# },
# "athena": {
# "execution_engine": {
# "module_name": "great_expectations.execution_engine",
# "class_name": "SqlAlchemyExecutionEngine",
# "connection_string": f"awsathena+rest://@athena.{REGION}.amazonaws.com:443/?s3_staging_dir=s3://{S3_ATHENA_BUCKET_NAME}/great_expectations/"
# },
# "data_connectors": {
# "default_runtime_data_connector_name": {
# "class_name": "RuntimeDataConnector",
# "batch_identifiers": [
# "default_identifier_name"
# ],
# "module_name": "great_expectations.datasource.data_connector"
# },
# "default_inferred_data_connector_name": {
# "class_name": "InferredAssetSqlDataConnector",
# "module_name": "great_expectations.datasource.data_connector"
# }
# },
# "module_name": "great_expectations.datasource",
# "class_name": "Datasource",
# },
# },
def get_batch_request(
datasource_name: str, data_asset_name: str, query: str
) -> Dict[str, Any]:
return dict(
datasource_name=datasource_name,
data_connector_name="default_runtime_data_connector_name",
data_asset_name=data_asset_name,
runtime_parameters={"query": query},
batch_identifiers={"default_identifier_name": "default_identifier"},
)
def get_checkpoint_config(
checkpoint_name: str,
expectation_suite_name: str,
notify_on: str = "all",
notify_with: str = "all"
) -> CheckpointConfig:
checkpoint_config = dict(
name=checkpoint_name,
config_version=1,
run_name_template=f"{checkpoint_name}-{datetime.now().isoformat()}",
module_name="great_expectations.checkpoint",
class_name="SimpleCheckpoint",
expectation_suite_name=expectation_suite_name,
slack_webhook=GE_SLACK_WEBHOOK,
notify_on=notify_on,
notify_with=notify_with,
)
return CheckpointConfig(**checkpoint_config)
default_args = {
"owner": "Guido",
"depends_on_past": False,
"retry_delay": timedelta(minutes=1),
"retries": 0,
}
with DAG(
dag_id="poc_greatexpectations_dag",
default_args=default_args,
start_date=datetime(2021, 12, 15),
catchup=False,
schedule_interval=None
) as dag:
data_context_config= None # DATA CONTEXT CONFIGURATION DELETED FOR CONFIDENCIALITY
# CONTAIN ALL stores, datasources, configurations as a complete `great_expectations.yaml`
# REFERENCED datasources configuration above
# Redshift
checkpoint_kwargs = {
"run_name": "Airflow: {{ run_id }}-{{ task_instance_key_str }}",
"batch_request": get_batch_request(
datasource_name="redshift",
data_asset_name="db.table",
query="""
select *
from db.table
limit 100
"""
)
}
checkpoint_config = get_checkpoint_config(
checkpoint_name="some checkpoint name redshift",
expectation_suite_name="redshift.db.table.warning",
notify_on="all",
notify_with=["local_site"], # TODO:review, not working
)
ge_redshift_validation_example = GreatExpectationsOperator(
task_id="ge_redshift_validation_example",
data_context_config=data_context_config,
checkpoint_config=checkpoint_config,
checkpoint_kwargs=checkpoint_kwargs,
fail_task_on_validation_failure=False,
validation_failure_callback=(lambda x: print("Callback successfully run", x)),
)
# Athena test
checkpoint_kwargs = {
"run_name": "Airflow: {{ run_id }}-{{ task_instance_key_str }}",
"batch_request": get_batch_request(
datasource_name="athena",
data_asset_name="db.table",
query="""
select *
from db.table
limit 100
"""
)
}
checkpoint_config = get_checkpoint_config(
checkpoint_name="some checkpoint name athena",
expectation_suite_name="athena.db.table.warning",
notify_on="all",
notify_with=["local_site"], # TODO:review, not working
)
ge_athena_validation_example = GreatExpectationsOperator(
task_id="ge_athena_validation_example",
data_context_config=data_context_config,
checkpoint_config=checkpoint_config,
checkpoint_kwargs=checkpoint_kwargs,
fail_task_on_validation_failure=False,
validation_failure_callback=(lambda x: print("Callback successfully run", x)),
)
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")
(
start
>> ge_redshift_validation_example
>> ge_athena_validation_example
>> end
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment