Skip to content

Instantly share code, notes, and snippets.

@elijahbenizzy
Last active July 4, 2023 16:57
Show Gist options
  • Save elijahbenizzy/ea3528e0d8b08f7205b0fe441b4b9cfb to your computer and use it in GitHub Desktop.
Save elijahbenizzy/ea3528e0d8b08f7205b0fe441b4b9cfb to your computer and use it in GitHub Desktop.
from hamilton.function_modifiers import check_output_custom
from hamilton.data_quality.base import DataValidator, ValidationResult, DataValidationLevel
from hamilton.ad_hoc_utils import create_temporary_module
from hamilton import driver
class UniqueColumnsValidator(DataValidator):
def __init__(self, importance: str):
super(UniqueColumnsValidator, self).__init__(importance=importance)
@classmethod
def applies_to(cls, datatype: Type[Type]) -> bool:
return issubclass(datatype, pd.DataFrame)
def description(self) -> str:
return "Columns must be unique"
@classmethod
def name(cls) -> str:
return "unique_columns"
def validate(self, dataset: pd.DataFrame) -> ValidationResult:
passes = dataset.columns.is_unique
return ValidationResult(
passes=passes,
message=f"Columns not unique: {dataset.columns}"
)
def __eq__(self, other):
return isinstance(other, self.__class__) and self.equal_to == other.equal_to
@check_output_custom(UniqueColumnsValidator(DataValidationLevel.FAIL))
def df() -> pd.DataFrame:
out = pd.DataFrame.from_records([{'a' : 1, 'b' : 2}])
return out
@check_output_custom(UniqueColumnsValidator(DataValidationLevel.FAIL))
def df_non_unique() -> pd.DataFrame:
out = pd.DataFrame.from_records([{'a' : 1, 'b' : 2}])
out.columns = ['a', 'a']
return out
if __name__ == '__main__':
mod = create_temporary_module(df, df_non_unique)
dr = driver.Driver({}, mod)
dr.visualize_execution(['df', 'df_non_unique'], "./out", {})
dr.execute(['df', 'df_non_unique'])
@elijahbenizzy
Copy link
Author

image

@elijahbenizzy
Copy link
Author

When run:

python ./custom_output_check.py
Note: Hamilton collects completely anonymous data about usage. This will help us improve Hamilton over time. See https://github.com/dagworks-inc/hamilton#usage-analytics--data-privacy for details.
[df_non_unique:unique_columns] validator failed. Message was: Columns not unique: Index(['a', 'a'], dtype='object'). Diagnostic information is: {}.

Node df_non_unique encountered an error
Traceback (most recent call last):
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/graph.py", line 481, in dfs_traverse
    value = adapter.execute_node(node_, kwargs)
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/base.py", line 457, in execute_node
    return node.callable(**kwargs)
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/function_modifiers/validation.py", line 84, in final_node_callable
    dq_base.act_fail_bulk(node_.name, failures)
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/data_quality/base.py", line 106, in act_fail_bulk
    raise DataValidationError(error_messages)
hamilton.data_quality.base.DataValidationError: ["[df_non_unique:unique_columns] validator failed. Message was: Columns not unique: Index(['a', 'a'], dtype='object'). Diagnostic information is: {}.\n"]
-------------------------------------------------------------------
Oh no an error! Need help with Hamilton?
Join our slack and ask for help! https://join.slack.com/t/hamilton-opensource/shared_invite/zt-1bjs72asx-wcUTgH7q7QX1igiQ5bbdcg
-------------------------------------------------------------------

Traceback (most recent call last):
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/user_examples/./custom_output_check.py", line 51, in <module>
    dr.execute(['df', 'df_non_unique'])
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/driver.py", line 278, in execute
    raise e
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/driver.py", line 271, in execute
    outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/driver.py", line 387, in raw_execute
    self.graph.execute(nodes, memoized_computation, overrides, inputs)
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/graph.py", line 529, in execute
    return FunctionGraph.execute_static(
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/graph.py", line 492, in execute_static
    dfs_traverse(final_var_node, dep_type)
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/graph.py", line 481, in dfs_traverse
    value = adapter.execute_node(node_, kwargs)
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/base.py", line 457, in execute_node
    return node.callable(**kwargs)
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/function_modifiers/validation.py", line 84, in final_node_callable
    dq_base.act_fail_bulk(node_.name, failures)
  File "/Users/elijahbenizzy/dev/dagworks/os/hamilton/hamilton/data_quality/base.py", line 106, in act_fail_bulk
    raise DataValidationError(error_messages)
hamilton.data_quality.base.DataValidationError: ["[df_non_unique:unique_columns] validator failed. Message was: Columns not unique: Index(['a', 'a'], dtype='object'). Diagnostic information is: {}.\n"]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment