Skip to content

Instantly share code, notes, and snippets.

@cyberbikepunk
Last active November 17, 2016 13:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cyberbikepunk/06746c48966ac0bc20180895f9ced925 to your computer and use it in GitHub Desktop.
Save cyberbikepunk/06746c48966ac0bc20180895f9ced925 to your computer and use it in GitHub Desktop.
"""This processor casts amounts and dates by sniffing data.
At this stage we assume that the data has gone through `reshape_data` and
`concatenate_identical_resources`. In other words, we assume that we have a
single resource with all the fiscal fields. The current schema differs
from a fiscal datapackage, however, in that all fields are strings. After this
processor, we can safely run the `load_fiscal_schema` processor`. Values will
have the correct os-type and the data will pass validation tests.
"""
from logging import warning, debug
from datapackage_pipelines.wrapper import ingest, spew
from jsontableschema.exceptions import InvalidCastError
from jsontableschema.types import DateType
from jsontableschema.types import NumberType
from petl import values, fromdicts
from common.utilities import process, format_to_json
from common.config import SNIFFER_SAMPLE_SIZE
class CasterNotFound(Exception):
template = 'Could not parse {}.\nSample values =\n{}'
def __init__(self, field, sample_values):
message = self.template.format(
format_to_json(field),
format_to_json(sample_values)
)
super(CasterNotFound, self).__init__(message)
class BaseSniffer(object):
type_class = None
guesses = tuple()
def __init__(self, field, sample_values):
self.field = field
self.sample_values = sample_values
@property
def caster(self):
for parameters in self.guesses:
caster = self.type_class(parameters)
for value in self.sample_values:
try:
caster.cast(value)
except InvalidCastError as error:
debug('Failed with %s (%s)', parameters, error)
break
return caster
else:
raise CasterNotFound(self.field, self.sample_values)
class DateSniffer(BaseSniffer):
type_class = DateType
guesses = (
{'format': 'YYYY'},
{'format': 'YYYY-MM-DD'}
)
class NumberSniffer(BaseSniffer):
type_class = NumberType
guesses = (
{'decimalChar': '.', 'groupChar': ','},
{'decimalChar': ',', 'groupChar': ' '},
{'decimalChar': '.', 'groupChar': '\''},
{'decimalChar': ',', 'groupChar': '.'},
)
def get_casters(datapackage, resource_sample):
"""Return a caster for each fiscal field."""
fields = datapackage['resources'][0]['schema']['fields']
casters = {}
for field in fields:
sample_values = values(fromdicts(resource_sample), field['name'])
if field['type'] == 'string':
caster = str
elif field['type'] == 'number':
if field.get('decimalChar') and field.get('groupingChar'):
caster = NumberType(field)
else:
caster = NumberSniffer(field, sample_values).caster
elif field['type'] == 'date':
if field.get('format'):
caster = DateType(field)
else:
caster = DateSniffer(field, sample_values).caster
else:
message = '% is an invalid field type'
raise ValueError(message.format(field['type']))
casters.update({field['name']: caster})
def cast_values(row, casters=None):
"""Cast values to fiscal types."""
for key, value in row.items():
if value:
try:
row[key] = casters[key](value)
except InvalidCastError as error:
message = 'Could not cast %s = %s to %s, returning None (%s)'
warning(message, key, row[key], casters[key], error)
row[key] = None
return row
def extract_data_sample(resource):
"""Extract a sample out of the data."""
data_sample = []
for i, row in enumerate(resource):
data_sample.append(row)
if i > SNIFFER_SAMPLE_SIZE:
return data_sample, resource
def concatenate(data_sample, resource):
"""Concatenate the data sample and the rest of the resource."""
for row in data_sample:
yield row
for row in resource:
yield row
if __name__ == '__main__':
_, datapackage_, resources_ = ingest()
resource_ = next(resources_)
resource_sample_, left_over_resource_ = extract_data_sample(resource_)
casters_ = get_casters(datapackage_, resource_sample_)
resource_ = concatenate(resource_sample_, left_over_resource_)
new_resource_ = process(resources_, cast_values, casters=casters_)
spew(datapackage_, [new_resource_])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment