Last active
July 6, 2019 16:10
-
-
Save micimize/a96dbebe5365cc158b5589e1f6f461a4 to your computer and use it in GitHub Desktop.
efficient goodtables constraint checks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import typing as t | |
from goodtables import Error, check | |
from tableschema.exceptions import TableSchemaException | |
from .schema_field import ConstraintsError, ExtensibleField | |
# primaryKey and unique constraints are unaccounted for here | |
# TODO look into optimizing unique constraint checks | |
# mapping of tableschema constraints to goodtables constraints | |
constraint_check_map = { | |
"required": "required-constraint", | |
"enum": "enumerable-constraint", | |
"minimum": "minimum-constraint", | |
"maximum": "maximum-constraint", | |
"maxLength": "maximum-length-constraint", | |
"minLength": "minimum-length-constraint", | |
"pattern": "pattern-constraint", | |
} | |
SCHEMA_CONSTRAINTS_CHECKS = "schema-constraints-checks" | |
@check(SCHEMA_CONSTRAINTS_CHECKS, type="custom", context="body") | |
def schema_constraints_checks(cells): | |
errors: t.List[Error] = [] | |
for cell in cells: | |
field: ExtensibleField = cell.get("field") | |
# Skip if cell has no field | |
if field is None: | |
continue | |
value = cell.get("value") | |
# Cast value | |
try: | |
field.cast_value(value, constraints=True) | |
except ConstraintsError as ce: | |
fvalue = f'"{value}"' | |
errors += [ | |
Error( | |
constraint_check_map[constraint], | |
cell, | |
message_substitutions={ | |
"value": fvalue, | |
"constraint": f'"{field.constraints[constraint]}"', | |
} | |
if constraint != "required" | |
else {"value": fvalue}, | |
) | |
for constraint in ce.constraints | |
] | |
except TableSchemaException: | |
# type-or-format errors swallow other errors | |
errors.append( | |
Error( | |
"type-or-format-error", | |
cell, | |
message_substitutions={ | |
"value": f'"{value}"', | |
"field_type": f'"{field.type}"', | |
"field_format": f'"{field.format}"', | |
}, | |
) | |
) | |
cells.remove(cell) | |
return errors |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import decimal | |
import json | |
import typing as t | |
from tableschema import Field, config | |
from tableschema.exceptions import CastError | |
from tableschema.exceptions import ConstraintError as _ConstraintError | |
# TODO should be aqdc Error | |
class ConstraintsError(_ConstraintError): | |
constraints: t.List[str] | |
value: t.Any | |
field_name: str | |
def __init__(self, field_name: str, value: t.Any, constraints: t.List[str]): | |
super().__init__( | |
f'Field "{field_name}" has constraints {constraints}' | |
f'which are not satisfied for value "{value}"' | |
) | |
self.value = value | |
self.constraints = constraints | |
self.field_name = field_name | |
class ExtensibleField(Field): | |
"""Table Schema field representation. | |
""" | |
@property | |
def title(self): | |
return self.descriptor.get("title", None) | |
@property | |
def description(self): | |
return self.descriptor.get("description", None) | |
@property | |
def default_value(self): | |
return self.descriptor.get("defaultValue", None) | |
_Field__descriptor: dict | |
_Field__missing_values: list | |
_Field__check_functions: list | |
@property | |
def _missing_values(self): | |
return self._Field__missing_values | |
@property | |
def _check_functions(self): | |
return self._Field__check_functions | |
@property | |
def _cast_function(self): | |
return self._Field__cast_function | |
_uncast_function: t.Callable[[t.Any], str] | |
def _get_uncast_function(self): | |
if self.type in ("date", "datetime"): | |
return _get_datetime_uncast(self.format) | |
return { | |
"integer": _int_uncast, | |
"number": _number_uncast, | |
"string": _string_uncast, | |
"array": _arr_uncast, | |
}[self.type] | |
def __init__(self, descriptor, missing_values=config.DEFAULT_MISSING_VALUES): | |
super().__init__(descriptor, missing_values) | |
self._uncast_function = self._get_uncast_function() | |
def _cast_value(self, value, constraints=True): | |
"""https://github.com/frictionlessdata/tableschema-py#field | |
Raises: | |
ConstraintErrors: if constraints are not met | |
""" | |
# Null value | |
if value in self._missing_values: | |
value = None | |
# Cast value | |
cast_value = value | |
if value is not None: | |
cast_value = self._cast_function(value) | |
if cast_value == config.ERROR: | |
raise CastError( | |
( | |
'Field "{field.name}" can\'t cast value "{value}" ' | |
'for type "{field.type}" with format "{field.format}"' | |
).format(field=self, value=value) | |
) | |
# Check value | |
if constraints: | |
invalid = [] | |
for constraint in ( | |
constraints | |
if isinstance(constraints, list) | |
else self._check_functions.keys() | |
): | |
if not self._check_functions[constraint](cast_value): | |
invalid.append(constraint) | |
if invalid: | |
raise ConstraintsError(self.name, value, constraints=invalid) | |
return cast_value | |
def cast_value(self, value, unsafe=False, constraints=True): | |
try: | |
cast = self._cast_value(value, constraints) | |
return self.default_value if cast is None else cast | |
except Exception as e: | |
if unsafe: | |
return self.default_value if value is None else value | |
raise e | |
def __uncast_value(self, value, str_fallback=False) -> str: | |
"""Stringifies a given field value | |
""" | |
if (self.type == "integer" and isinstance(value, int)) or ( | |
self.type == "number" and isinstance(value, decimal.Decimal) | |
): | |
return str(value) | |
elif self.type == "string": | |
if isinstance(value, str): | |
return value | |
else: | |
return json.dumps(value) if value is not None else "" | |
elif self.type in ("date", "datetime") and isinstance( | |
value, (datetime.datetime, datetime.date) | |
): | |
if self.format not in [None, "any", "default"]: | |
return value.strftime(self.format) | |
else: | |
return value.isoformat() | |
elif self.type == "array" and isinstance(value, (list, tuple)): | |
return json.dumps(value) | |
elif value is None: | |
return "" | |
elif str_fallback: | |
return str(value) | |
else: | |
message = f"Failed to uncast field {self.name} of type {self.type} for value '{value}' safely" | |
raise CastError(message, errors=[message]) | |
def uncast_value(self, value, unsafe=False): | |
"""Stringifies a given field value | |
Arguments: | |
unsafe: disable CastErrors and uncast to empty string | |
Returns: | |
str: the uncast value | |
Raises: | |
CastError: If value is not encodable and unsafe = False | |
""" | |
try: | |
return self.__uncast_value(value, str_fallback=unsafe) | |
except Exception as e: | |
if unsafe: | |
return "" | |
else: | |
raise e | |
def _int_uncast(value): | |
if isinstance(value, int): | |
return str(value) | |
def _number_uncast(value): | |
if isinstance(value, decimal.Decimal): | |
return str(value) | |
def _string_uncast(value): | |
if isinstance(value, str): | |
return value | |
else: | |
return json.dumps(value) if value is not None else "" | |
def _get_datetime_uncast(type_format): | |
def _datetime_uncast(value): | |
if isinstance(value, (datetime.datetime, datetime.date)): | |
return ( | |
value.isoformat() | |
if type_format in [None, "any", "default"] | |
else value.strftime(type_format) | |
) | |
return _datetime_uncast | |
def _arr_uncast(value): | |
if isinstance(value, (list, tuple)): | |
return json.dumps(value) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment