Skip to content

Instantly share code, notes, and snippets.

@PaoloLeonard
Last active September 8, 2021 20:22
Show Gist options
  • Save PaoloLeonard/104494d1a0c01ad7891fd934c684c447 to your computer and use it in GitHub Desktop.
Save PaoloLeonard/104494d1a0c01ad7891fd934c684c447 to your computer and use it in GitHub Desktop.
Table metric implementation for the GE table expectation tutorial.
from typing import Dict, Tuple, Any
from great_expectations.core.batch_spec import PathBatchSpec
from great_expectations.execution_engine import (
SparkDFExecutionEngine,
PandasExecutionEngine
)
from great_expectations.expectations.metrics.metric_provider import metric_value
from great_expectations.expectations.metrics.table_metric_provider import (
TableMetricProvider,
)
class OtherTableRowCount(TableMetricProvider):
"""MetricProvider class to get row count from different tables than the current one."""
metric_name = "table.row_count_other"
@metric_value(engine=PandasExecutionEngine)
def _pandas(
cls,
execution_engine: "PandasExecutionEngine",
metric_domain_kwargs: Dict,
metric_value_kwargs: Dict,
metrics: Dict[Tuple, Any],
runtime_configuration: Dict,
) -> int:
other_table_filename = metric_domain_kwargs.get("table_filename")
batch_spec = PathBatchSpec(
{"path": other_table_filename, "reader_method": "read_csv"}
)
batch_data = execution_engine.get_batch_data(batch_spec=batch_spec)
df = batch_data.dataframe
return df.shape[0]
@metric_value(engine=SparkDFExecutionEngine)
def _spark(
cls,
execution_engine: "SparkDFExecutionEngine",
metric_domain_kwargs: Dict,
metric_value_kwargs: Dict,
metrics: Dict[Tuple, Any],
runtime_configuration: Dict,
) -> int:
other_table_filename = metric_domain_kwargs.get("table_filename")
batch_spec = PathBatchSpec(
{"path": other_table_filename, "reader_method": "csv"}
)
batch_data = execution_engine.get_batch_data(batch_spec=batch_spec)
df = batch_data.dataframe
return df.count()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment