Skip to content

Instantly share code, notes, and snippets.

@a-chumagin
Created April 4, 2023 20:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save a-chumagin/b9dd5fa2e5b56eae7353c8944e1de202 to your computer and use it in GitHub Desktop.
Save a-chumagin/b9dd5fa2e5b56eae7353c8944e1de202 to your computer and use it in GitHub Desktop.
GX: checkpoint in parallel
import great_expectations as gx
from great_expectations.data_context.types.base import DataContextConfig, FilesystemStoreBackendDefaults
import os
from great_expectations.core.batch import RuntimeBatchRequest
import pytest
@pytest.fixture
def create_context():
context_dir = os.path.abspath("./tripdata")
data_context_config = DataContextConfig(
store_backend_defaults=FilesystemStoreBackendDefaults(
root_directory=context_dir))
context = gx.get_context(project_config=data_context_config)
context.add_datasource(**datasource_config())
save_config_to_file(context)
checkpoint_config = create_checkpoint_config()
context.add_or_update_checkpoint(**checkpoint_config)
return context
def save_config_to_file(context):
with open("./tripdata/great_expectations.yml", 'w') as outfile:
context.config.to_yaml(outfile)
def create_checkpoint_config():
my_checkpoint_name = "my_checkpoint"
checkpoint_config = {
"name": my_checkpoint_name,
"config_version": 1,
"class_name": "SimpleCheckpoint",
"run_name_template": "%Y%m%d-%H%M%S-my-run-name-template",
}
return checkpoint_config
def datasource_config():
datasource_config = {
"name": "taxi_datasource",
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "PandasExecutionEngine",
},
"data_connectors": {
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"module_name": "great_expectations.datasource.data_connector",
"batch_identifiers": ["default_identifier_name"],
},
"default_inferred_data_connector_name": {
"class_name": "InferredAssetFilesystemDataConnector",
"base_directory": "./data/",
"default_regex": {"group_names": ["data_asset_name"], "pattern": "(.*)"},
},
},
}
return datasource_config
@pytest.fixture
def yellow_taxi_batch_request():
yellow_taxi_batch_request = RuntimeBatchRequest(
datasource_name="taxi_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="yellow_taxi",
runtime_parameters={"path": "./data/yellow_tripdata_2022-01.parquet"},
batch_identifiers={"default_identifier_name": "yellow_taxi"}
)
return yellow_taxi_batch_request
@pytest.fixture
def green_taxi_batch_request():
batch_request = RuntimeBatchRequest(
datasource_name="taxi_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="green_taxi",
runtime_parameters={"path": "./data/green_tripdata_2022-01.parquet"},
batch_identifiers={"default_identifier_name": "green_taxi"}
)
return batch_request
@pytest.mark.parametrize("expectation_suite_name, taxi_type", [
("integrity", "green"),
("completeness", "green"),
("integrity", "yellow"),
("completeness", "yellow")])
def test_tax_data_set(create_context, expectation_suite_name, taxi_type, request):
batch_request_fixture_name = f"{taxi_type}_taxi_batch_request"
taxi_batch = request.getfixturevalue(batch_request_fixture_name)
validation = {"batch_request": taxi_batch,
"expectation_suite_name": expectation_suite_name}
result = create_context.run_checkpoint(checkpoint_name="my_checkpoint", validations=[validation])
assert result["success"] is True
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment