Last active
November 17, 2016 13:36
-
-
Save cyberbikepunk/06746c48966ac0bc20180895f9ced925 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""This processor casts amounts and dates by sniffing data. | |
At this stage we assume that the data has gone through `reshape_data` and | |
`concatenate_identical_resources`. In other words, we assume that we have a | |
single resource with all the fiscal fields. The current schema differs | |
from a fiscal datapackage, however, in that all fields are strings. After this | |
processor, we can safely run the `load_fiscal_schema` processor`. Values will | |
have the correct os-type and the data will pass validation tests. | |
""" | |
from logging import warning, debug | |
from datapackage_pipelines.wrapper import ingest, spew | |
from jsontableschema.exceptions import InvalidCastError | |
from jsontableschema.types import DateType | |
from jsontableschema.types import NumberType | |
from petl import values, fromdicts | |
from common.utilities import process, format_to_json | |
from common.config import SNIFFER_SAMPLE_SIZE | |
class CasterNotFound(Exception): | |
template = 'Could not parse {}.\nSample values =\n{}' | |
def __init__(self, field, sample_values): | |
message = self.template.format( | |
format_to_json(field), | |
format_to_json(sample_values) | |
) | |
super(CasterNotFound, self).__init__(message) | |
class BaseSniffer(object): | |
type_class = None | |
guesses = tuple() | |
def __init__(self, field, sample_values): | |
self.field = field | |
self.sample_values = sample_values | |
@property | |
def caster(self): | |
for parameters in self.guesses: | |
caster = self.type_class(parameters) | |
for value in self.sample_values: | |
try: | |
caster.cast(value) | |
except InvalidCastError as error: | |
debug('Failed with %s (%s)', parameters, error) | |
break | |
return caster | |
else: | |
raise CasterNotFound(self.field, self.sample_values) | |
class DateSniffer(BaseSniffer): | |
type_class = DateType | |
guesses = ( | |
{'format': 'YYYY'}, | |
{'format': 'YYYY-MM-DD'} | |
) | |
class NumberSniffer(BaseSniffer): | |
type_class = NumberType | |
guesses = ( | |
{'decimalChar': '.', 'groupChar': ','}, | |
{'decimalChar': ',', 'groupChar': ' '}, | |
{'decimalChar': '.', 'groupChar': '\''}, | |
{'decimalChar': ',', 'groupChar': '.'}, | |
) | |
def get_casters(datapackage, resource_sample): | |
"""Return a caster for each fiscal field.""" | |
fields = datapackage['resources'][0]['schema']['fields'] | |
casters = {} | |
for field in fields: | |
sample_values = values(fromdicts(resource_sample), field['name']) | |
if field['type'] == 'string': | |
caster = str | |
elif field['type'] == 'number': | |
if field.get('decimalChar') and field.get('groupingChar'): | |
caster = NumberType(field) | |
else: | |
caster = NumberSniffer(field, sample_values).caster | |
elif field['type'] == 'date': | |
if field.get('format'): | |
caster = DateType(field) | |
else: | |
caster = DateSniffer(field, sample_values).caster | |
else: | |
message = '% is an invalid field type' | |
raise ValueError(message.format(field['type'])) | |
casters.update({field['name']: caster}) | |
def cast_values(row, casters=None): | |
"""Cast values to fiscal types.""" | |
for key, value in row.items(): | |
if value: | |
try: | |
row[key] = casters[key](value) | |
except InvalidCastError as error: | |
message = 'Could not cast %s = %s to %s, returning None (%s)' | |
warning(message, key, row[key], casters[key], error) | |
row[key] = None | |
return row | |
def extract_data_sample(resource): | |
"""Extract a sample out of the data.""" | |
data_sample = [] | |
for i, row in enumerate(resource): | |
data_sample.append(row) | |
if i > SNIFFER_SAMPLE_SIZE: | |
return data_sample, resource | |
def concatenate(data_sample, resource): | |
"""Concatenate the data sample and the rest of the resource.""" | |
for row in data_sample: | |
yield row | |
for row in resource: | |
yield row | |
if __name__ == '__main__': | |
_, datapackage_, resources_ = ingest() | |
resource_ = next(resources_) | |
resource_sample_, left_over_resource_ = extract_data_sample(resource_) | |
casters_ = get_casters(datapackage_, resource_sample_) | |
resource_ = concatenate(resource_sample_, left_over_resource_) | |
new_resource_ = process(resources_, cast_values, casters=casters_) | |
spew(datapackage_, [new_resource_]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment