Skip to content

Instantly share code, notes, and snippets.

@anthonyburdi
Created October 26, 2022 20:43
Show Gist options
  • Save anthonyburdi/4d804644bcbef124feea40d5cb8f830b to your computer and use it in GitHub Desktop.
Save anthonyburdi/4d804644bcbef124feea40d5cb8f830b to your computer and use it in GitHub Desktop.
# In discussion for https://github.com/great-expectations/great_expectations/pull/6164
from collections import OrderedDict
import pandas as pd
from great_expectations.self_check.util import build_spark_validator_with_data
from great_expectations.validator.validator import Validator
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.core.batch import Batch
from great_expectations.expectations.core import ExpectCompoundColumnsToBeUnique
from great_expectations.core.expectation_configuration import ExpectationConfiguration
from pyspark import Row
from pyspark.sql import SparkSession
from great_expectations.dataset import SparkDFDataset
# requirements
# great_expectations==0.15.28
# pyspark==3.2.1
# pytest==6.0.1
def reproduce_4295_issue_v2_spark():
# given
columns = ["country", "city", "area"]
data = [
("Poland", "Warsaw", 1000),
("Poland", "Warsaw", 500),
("Poland", "Warsaw", 1500),
]
spark = (
SparkSession.builder.master("local[*]")
.appName("reproduce_4295_issue")
.getOrCreate()
)
df = spark.createDataFrame(map(lambda x: Row(*x), data), columns)
ge_df = SparkDFDataset(df)
# when
result = ge_df.expect_compound_columns_to_be_unique(
column_list=["country", "city"], result_format={"result_format": "COMPLETE"}
)
# then
assert (
result["result"]["details"]["partial_unexpected_counts_error"]
== "partial_unexpected_counts requested, but requires a hashable type"
)
# full result
assert result.result == {
"element_count": 3,
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_count": 3,
"unexpected_percent": 100.0,
"unexpected_percent_total": 100.0,
"unexpected_percent_nonmissing": 100.0,
"partial_unexpected_list": [
OrderedDict([("country", "Poland"), ("city", "Warsaw")]),
OrderedDict([("country", "Poland"), ("city", "Warsaw")]),
OrderedDict([("country", "Poland"), ("city", "Warsaw")]),
],
"details": {
"partial_unexpected_counts_error": "partial_unexpected_counts requested, but requires a hashable type"
},
"partial_unexpected_index_list": None,
"partial_unexpected_counts": [],
"unexpected_list": [
OrderedDict([("country", "Poland"), ("city", "Warsaw")]),
OrderedDict([("country", "Poland"), ("city", "Warsaw")]),
OrderedDict([("country", "Poland"), ("city", "Warsaw")]),
],
"unexpected_index_list": None,
}
def reproduce_4295_issue_v3_spark():
df = pd.DataFrame(
{
"country": ["Poland", "Poland", "Poland"],
"city": ["Warsaw", "Warsaw", "Warsaw"],
"area": [1000, 500, 1500],
}
)
spark_session = (
SparkSession.builder.master("local[*]")
.appName("reproduce_4295_issue")
.getOrCreate()
)
validator = build_spark_validator_with_data(df, spark_session)
result = validator.expect_compound_columns_to_be_unique(
column_list=["country", "city"], result_format="COMPLETE"
)
print(result.result)
assert result.result == {
"element_count": 3,
"unexpected_count": 3,
"unexpected_percent": 100.0,
"partial_unexpected_list": [
{"country": "Poland", "city": "Warsaw"},
{"country": "Poland", "city": "Warsaw"},
{"country": "Poland", "city": "Warsaw"},
],
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_percent_total": 100.0,
"unexpected_percent_nonmissing": 100.0,
"partial_unexpected_index_list": None,
"partial_unexpected_counts": [{"value": ("Poland", "Warsaw"), "count": 3}],
"unexpected_list": [
{"country": "Poland", "city": "Warsaw"},
{"country": "Poland", "city": "Warsaw"},
{"country": "Poland", "city": "Warsaw"},
],
"unexpected_index_list": None,
}
assert not result.result.get('details')
def reproduce_4295_issue_v3_pandas():
df = pd.DataFrame(
{
"country": ["Poland", "Poland", "Poland"],
"city": ["Warsaw", "Warsaw", "Warsaw"],
"area": [1000, 500, 1500],
}
)
expectation_configuration = ExpectationConfiguration(
expectation_type="expect_compound_columns_to_be_unique",
kwargs={
"column_list": ["country", "city"],
"result_format": {
"result_format": "COMPLETE",
},
},
)
expectation = ExpectCompoundColumnsToBeUnique(expectation_configuration)
batch = Batch(data=df)
engine = PandasExecutionEngine()
validator = Validator(
execution_engine=engine,
batches=[
batch,
],
)
result = expectation.validate(validator)
assert result.result == {
"element_count": 3,
"unexpected_count": 3,
"unexpected_percent": 100.0,
"partial_unexpected_list": [
{"country": "Poland", "city": "Warsaw"},
{"country": "Poland", "city": "Warsaw"},
{"country": "Poland", "city": "Warsaw"},
],
"missing_count": 0,
"missing_percent": 0.0,
"unexpected_percent_total": 100.0,
"unexpected_percent_nonmissing": 100.0,
"partial_unexpected_index_list": [0, 1, 2],
"partial_unexpected_counts": [{"value": ("Poland", "Warsaw"), "count": 3}],
"unexpected_list": [
{"country": "Poland", "city": "Warsaw"},
{"country": "Poland", "city": "Warsaw"},
{"country": "Poland", "city": "Warsaw"},
],
"unexpected_index_list": [0, 1, 2],
}
if __name__ == "__main__":
reproduce_4295_issue_v2_spark()
reproduce_4295_issue_v3_pandas()
reproduce_4295_issue_v3_spark()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment