Last active
December 30, 2021 19:03
-
-
Save moritzkoerber/9ad2a6d1d94efb57cba2cbca04dbd901 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import yaml | |
import great_expectations as ge | |
import os | |
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource | |
from great_expectations.core.batch import BatchRequest | |
from great_expectations.core.expectation_configuration import ExpectationConfiguration | |
from contextlib import suppress | |
project_dir = f"{os.getcwd()}/own_de_project/great_expectations" | |
with suppress(FileExistsError): | |
os.makedirs(project_dir) | |
os.chdir(project_dir) | |
context = ge.data_context.DataContext.create(project_dir) | |
context = ge.get_context() | |
# add datasource | |
datasource_name = "vaccinations" | |
s3_bucket = "data-pipeline-s3-bucket-production" | |
datasource_yaml = fr""" | |
name: {datasource_name} | |
class_name: Datasource | |
execution_engine: | |
class_name: PandasExecutionEngine | |
data_connectors: | |
default_runtime_data_connector_name: | |
class_name: RuntimeDataConnector | |
batch_identifiers: | |
- default_identifier_name | |
s3_data_connector: | |
class_name: InferredAssetS3DataConnector | |
bucket: {s3_bucket} | |
prefix: data/vaccinations/date=2021-12-10 | |
default_regex: | |
group_names: | |
- data_asset_name | |
pattern: (.*) | |
file_system_data_connector_name: | |
class_name: InferredAssetFilesystemDataConnector | |
base_directory: ../ | |
default_regex: | |
group_names: | |
- data_asset_name | |
pattern: (.*) | |
""" | |
context.test_yaml_config(yaml_config=datasource_yaml) | |
# context.add_datasource(**yaml.load(datasource_yaml)) | |
sanitize_yaml_and_save_datasource(context, datasource_yaml, overwrite_existing=True) | |
# test data source | |
batch_request = BatchRequest( | |
datasource_name=datasource_name, | |
data_connector_name="s3_data_connector", | |
data_asset_name="data/vaccinations/date=2021-12-10/2021-12-10.parquet", | |
) | |
context.create_expectation_suite( | |
expectation_suite_name="test_suite", overwrite_existing=True | |
) | |
validator = context.get_validator( | |
batch_request=batch_request, expectation_suite_name="test_suite" | |
) | |
print(validator.head()) | |
# create expectatiion suite | |
expectation_suite_name = "exp_suite" | |
suite = context.create_expectation_suite( | |
expectation_suite_name=expectation_suite_name, overwrite_existing=True | |
) | |
expectation_configuration = ExpectationConfiguration( | |
expectation_type="expect_table_columns_to_match_ordered_list", | |
kwargs={ | |
"column_list": [ | |
"cases", | |
"deaths", | |
"recovered", | |
"weekIncidence", | |
"casesPer100k", | |
"casesPerWeek", | |
"delta.cases", | |
"delta.deaths", | |
"delta.recovered", | |
"r.value", | |
"r.rValue4Days.value", | |
"r.rValue4Days.date", | |
"r.rValue7Days.value", | |
"r.rValue7Days.date", | |
"r.lastUpdate", | |
"hospitalization.cases7Days", | |
"hospitalization.incidence7Days", | |
"hospitalization.date", | |
"hospitalization.lastUpdate", | |
"meta.source", | |
"meta.contact", | |
"meta.info", | |
"meta.lastUpdate", | |
"meta.lastCheckedForUpdate", | |
] | |
}, | |
) | |
suite.add_expectation(expectation_configuration=expectation_configuration) | |
context.save_expectation_suite( | |
expectation_suite=suite, expectation_suite_name=expectation_suite_name | |
) | |
# test local data | |
my_checkpoint_name = "checkpoint" | |
checkpoint_yaml = f""" | |
name: {my_checkpoint_name} | |
config_version: 1.0 | |
class_name: SimpleCheckpoint | |
run_name_template: "%Y%m%d-%H%M%S-my-run-name-template" | |
validations: | |
- batch_request: | |
datasource_name: vaccinations | |
data_connector_name: file_system_data_connector_name | |
data_asset_name: 2021-12-13.parquet | |
data_connector_query: | |
index: -1 | |
expectation_suite_name: exp_suite | |
""" | |
context.test_yaml_config(yaml_config=checkpoint_yaml) | |
context.add_checkpoint(**yaml.load(checkpoint_yaml)) | |
## run checkpoint | |
results = context.run_checkpoint(checkpoint_name=my_checkpoint_name) | |
list(results.run_results.items())[0][1]["validation_result"]["success"] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment