Skip to content

Instantly share code, notes, and snippets.

@micimize
Last active July 6, 2019 16:10
Show Gist options
  • Save micimize/a96dbebe5365cc158b5589e1f6f461a4 to your computer and use it in GitHub Desktop.
Save micimize/a96dbebe5365cc158b5589e1f6f461a4 to your computer and use it in GitHub Desktop.
efficient goodtables constraint checks
import typing as t
from goodtables import Error, check
from tableschema.exceptions import TableSchemaException
from .schema_field import ConstraintsError, ExtensibleField
# primaryKey and unique constraints are unaccounted for here
# TODO look into optimizing unique constraint checks
# mapping of tableschema constraints to goodtables constraints
constraint_check_map = {
"required": "required-constraint",
"enum": "enumerable-constraint",
"minimum": "minimum-constraint",
"maximum": "maximum-constraint",
"maxLength": "maximum-length-constraint",
"minLength": "minimum-length-constraint",
"pattern": "pattern-constraint",
}
SCHEMA_CONSTRAINTS_CHECKS = "schema-constraints-checks"
@check(SCHEMA_CONSTRAINTS_CHECKS, type="custom", context="body")
def schema_constraints_checks(cells):
errors: t.List[Error] = []
for cell in cells:
field: ExtensibleField = cell.get("field")
# Skip if cell has no field
if field is None:
continue
value = cell.get("value")
# Cast value
try:
field.cast_value(value, constraints=True)
except ConstraintsError as ce:
fvalue = f'"{value}"'
errors += [
Error(
constraint_check_map[constraint],
cell,
message_substitutions={
"value": fvalue,
"constraint": f'"{field.constraints[constraint]}"',
}
if constraint != "required"
else {"value": fvalue},
)
for constraint in ce.constraints
]
except TableSchemaException:
# type-or-format errors swallow other errors
errors.append(
Error(
"type-or-format-error",
cell,
message_substitutions={
"value": f'"{value}"',
"field_type": f'"{field.type}"',
"field_format": f'"{field.format}"',
},
)
)
cells.remove(cell)
return errors
import datetime
import decimal
import json
import typing as t
from tableschema import Field, config
from tableschema.exceptions import CastError
from tableschema.exceptions import ConstraintError as _ConstraintError
# TODO should be aqdc Error
class ConstraintsError(_ConstraintError):
constraints: t.List[str]
value: t.Any
field_name: str
def __init__(self, field_name: str, value: t.Any, constraints: t.List[str]):
super().__init__(
f'Field "{field_name}" has constraints {constraints}'
f'which are not satisfied for value "{value}"'
)
self.value = value
self.constraints = constraints
self.field_name = field_name
class ExtensibleField(Field):
"""Table Schema field representation.
"""
@property
def title(self):
return self.descriptor.get("title", None)
@property
def description(self):
return self.descriptor.get("description", None)
@property
def default_value(self):
return self.descriptor.get("defaultValue", None)
_Field__descriptor: dict
_Field__missing_values: list
_Field__check_functions: list
@property
def _missing_values(self):
return self._Field__missing_values
@property
def _check_functions(self):
return self._Field__check_functions
@property
def _cast_function(self):
return self._Field__cast_function
_uncast_function: t.Callable[[t.Any], str]
def _get_uncast_function(self):
if self.type in ("date", "datetime"):
return _get_datetime_uncast(self.format)
return {
"integer": _int_uncast,
"number": _number_uncast,
"string": _string_uncast,
"array": _arr_uncast,
}[self.type]
def __init__(self, descriptor, missing_values=config.DEFAULT_MISSING_VALUES):
super().__init__(descriptor, missing_values)
self._uncast_function = self._get_uncast_function()
def _cast_value(self, value, constraints=True):
"""https://github.com/frictionlessdata/tableschema-py#field
Raises:
ConstraintErrors: if constraints are not met
"""
# Null value
if value in self._missing_values:
value = None
# Cast value
cast_value = value
if value is not None:
cast_value = self._cast_function(value)
if cast_value == config.ERROR:
raise CastError(
(
'Field "{field.name}" can\'t cast value "{value}" '
'for type "{field.type}" with format "{field.format}"'
).format(field=self, value=value)
)
# Check value
if constraints:
invalid = []
for constraint in (
constraints
if isinstance(constraints, list)
else self._check_functions.keys()
):
if not self._check_functions[constraint](cast_value):
invalid.append(constraint)
if invalid:
raise ConstraintsError(self.name, value, constraints=invalid)
return cast_value
def cast_value(self, value, unsafe=False, constraints=True):
try:
cast = self._cast_value(value, constraints)
return self.default_value if cast is None else cast
except Exception as e:
if unsafe:
return self.default_value if value is None else value
raise e
def __uncast_value(self, value, str_fallback=False) -> str:
"""Stringifies a given field value
"""
if (self.type == "integer" and isinstance(value, int)) or (
self.type == "number" and isinstance(value, decimal.Decimal)
):
return str(value)
elif self.type == "string":
if isinstance(value, str):
return value
else:
return json.dumps(value) if value is not None else ""
elif self.type in ("date", "datetime") and isinstance(
value, (datetime.datetime, datetime.date)
):
if self.format not in [None, "any", "default"]:
return value.strftime(self.format)
else:
return value.isoformat()
elif self.type == "array" and isinstance(value, (list, tuple)):
return json.dumps(value)
elif value is None:
return ""
elif str_fallback:
return str(value)
else:
message = f"Failed to uncast field {self.name} of type {self.type} for value '{value}' safely"
raise CastError(message, errors=[message])
def uncast_value(self, value, unsafe=False):
"""Stringifies a given field value
Arguments:
unsafe: disable CastErrors and uncast to empty string
Returns:
str: the uncast value
Raises:
CastError: If value is not encodable and unsafe = False
"""
try:
return self.__uncast_value(value, str_fallback=unsafe)
except Exception as e:
if unsafe:
return ""
else:
raise e
def _int_uncast(value):
if isinstance(value, int):
return str(value)
def _number_uncast(value):
if isinstance(value, decimal.Decimal):
return str(value)
def _string_uncast(value):
if isinstance(value, str):
return value
else:
return json.dumps(value) if value is not None else ""
def _get_datetime_uncast(type_format):
def _datetime_uncast(value):
if isinstance(value, (datetime.datetime, datetime.date)):
return (
value.isoformat()
if type_format in [None, "any", "default"]
else value.strftime(type_format)
)
return _datetime_uncast
def _arr_uncast(value):
if isinstance(value, (list, tuple)):
return json.dumps(value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment