Skip to content

Instantly share code, notes, and snippets.

@moritzkoerber
Last active December 30, 2021 19:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save moritzkoerber/9ad2a6d1d94efb57cba2cbca04dbd901 to your computer and use it in GitHub Desktop.
Save moritzkoerber/9ad2a6d1d94efb57cba2cbca04dbd901 to your computer and use it in GitHub Desktop.
import yaml
import great_expectations as ge
import os
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource
from great_expectations.core.batch import BatchRequest
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from contextlib import suppress
project_dir = f"{os.getcwd()}/own_de_project/great_expectations"
with suppress(FileExistsError):
os.makedirs(project_dir)
os.chdir(project_dir)
context = ge.data_context.DataContext.create(project_dir)
context = ge.get_context()
# add datasource
datasource_name = "vaccinations"
s3_bucket = "data-pipeline-s3-bucket-production"
datasource_yaml = fr"""
name: {datasource_name}
class_name: Datasource
execution_engine:
class_name: PandasExecutionEngine
data_connectors:
default_runtime_data_connector_name:
class_name: RuntimeDataConnector
batch_identifiers:
- default_identifier_name
s3_data_connector:
class_name: InferredAssetS3DataConnector
bucket: {s3_bucket}
prefix: data/vaccinations/date=2021-12-10
default_regex:
group_names:
- data_asset_name
pattern: (.*)
file_system_data_connector_name:
class_name: InferredAssetFilesystemDataConnector
base_directory: ../
default_regex:
group_names:
- data_asset_name
pattern: (.*)
"""
context.test_yaml_config(yaml_config=datasource_yaml)
# context.add_datasource(**yaml.load(datasource_yaml))
sanitize_yaml_and_save_datasource(context, datasource_yaml, overwrite_existing=True)
# test data source
batch_request = BatchRequest(
datasource_name=datasource_name,
data_connector_name="s3_data_connector",
data_asset_name="data/vaccinations/date=2021-12-10/2021-12-10.parquet",
)
context.create_expectation_suite(
expectation_suite_name="test_suite", overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="test_suite"
)
print(validator.head())
# create expectatiion suite
expectation_suite_name = "exp_suite"
suite = context.create_expectation_suite(
expectation_suite_name=expectation_suite_name, overwrite_existing=True
)
expectation_configuration = ExpectationConfiguration(
expectation_type="expect_table_columns_to_match_ordered_list",
kwargs={
"column_list": [
"cases",
"deaths",
"recovered",
"weekIncidence",
"casesPer100k",
"casesPerWeek",
"delta.cases",
"delta.deaths",
"delta.recovered",
"r.value",
"r.rValue4Days.value",
"r.rValue4Days.date",
"r.rValue7Days.value",
"r.rValue7Days.date",
"r.lastUpdate",
"hospitalization.cases7Days",
"hospitalization.incidence7Days",
"hospitalization.date",
"hospitalization.lastUpdate",
"meta.source",
"meta.contact",
"meta.info",
"meta.lastUpdate",
"meta.lastCheckedForUpdate",
]
},
)
suite.add_expectation(expectation_configuration=expectation_configuration)
context.save_expectation_suite(
expectation_suite=suite, expectation_suite_name=expectation_suite_name
)
# test local data
my_checkpoint_name = "checkpoint"
checkpoint_yaml = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-my-run-name-template"
validations:
- batch_request:
datasource_name: vaccinations
data_connector_name: file_system_data_connector_name
data_asset_name: 2021-12-13.parquet
data_connector_query:
index: -1
expectation_suite_name: exp_suite
"""
context.test_yaml_config(yaml_config=checkpoint_yaml)
context.add_checkpoint(**yaml.load(checkpoint_yaml))
## run checkpoint
results = context.run_checkpoint(checkpoint_name=my_checkpoint_name)
list(results.run_results.items())[0][1]["validation_result"]["success"]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment