Created
December 11, 2016 22:54
-
-
Save cyberbikepunk/2db56d198e7e22c6a89ca9d1bf6ed901 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Boilerplate functionality for processors.""" | |
import logging | |
import copy | |
import petl | |
import json | |
import itertools | |
import collections | |
from datapackage_pipelines.wrapper.wrapper import processor | |
def _override_parameters(pipeline_parameters, | |
datapackage, resource_index, | |
processor_name=processor()): | |
parameters = copy.deepcopy(pipeline_parameters) | |
datapackage_parameters = datapackage.get(processor_name, {}) | |
resource_properties = datapackage['resources'][resource_index] | |
resource_parameters = resource_properties.get(processor_name, {}) | |
_check_processing_parameters(pipeline_parameters, | |
datapackage_parameters, | |
resource_parameters) | |
parameters.update(datapackage_parameters) | |
parameters.update(resource_parameters) | |
return parameters | |
def _check_processing_parameters(*configuration): | |
message = 'Parameters must be a dict, found {}' | |
for parameters in configuration: | |
if not isinstance(parameters, dict): | |
raise TypeError(message.format(parameters)) | |
def _write_to_log(parameters, sample_rows, resource_index): | |
parameters = json.dumps(parameters, ensure_ascii=False, indent=4) | |
table_view = petl.look(petl.fromdicts(sample_rows)) | |
logging.info('Processed resource %s', resource_index) | |
logging.info('Parameters = %s', parameters) | |
logging.info('Sample output: \n%s', table_view) | |
def _get_sample_rows(row_generator, sample_size): | |
sample_rows = [] | |
for i, sample_row in enumerate(row_generator): | |
sample_rows.append(sample_row) | |
if i + 1 == sample_size: | |
return sample_rows | |
else: | |
return sample_rows | |
Index = collections.namedtuple('Index', ['resource', 'row']) | |
# noinspection PyDefaultArgument | |
def process(resources, | |
row_processor, | |
pass_context=False, | |
sample_size=15, | |
verbose=True, | |
parameters={}, | |
datapackage=None, | |
nothing_to_report='_pass'): | |
"""Apply a row processor to each row of each datapackage resource. | |
The function provides the following boilerplate functionality: | |
* Override pipeline parameters with datapackage parameters | |
* Record the parameters to the log | |
* Force iteration over a sample of rows | |
* Dump the data samples to the log | |
* Collect statistics | |
:param resources: a generator of generators of rows | |
:param row_processor: a function that processes one row of data | |
:param pass_context: whether to pass over the resource and row indices | |
:param sample_size: the size of the data sample | |
:param verbose: whether to log the parameters and the data sample | |
:param parameters: the processing parameters found in `pipeline-spec.yaml` | |
:param datapackage: required to override pipeline parameters | |
:param nothing_to_report: tells the stats collector to skip this row | |
:raises: `TypeError` if any of the parameters sets is not a `dict` | |
:returns: the new_resource (a generator of generators) | |
:returns: the processor report (a list of dicts) | |
""" | |
processor_report = [] | |
def process_resources(parameters_): | |
"""Return a generator of resources.""" | |
for resource_index, resource in enumerate(resources): | |
resource_report = {} | |
if datapackage: | |
parameters_ = _override_parameters(parameters_, | |
datapackage, | |
resource_index) | |
def process_rows(resource_): | |
"""Return a generator of rows.""" | |
for row_index, row in enumerate(resource_): | |
index = Index(resource_index, row_index) | |
context = (index,) if pass_context else () | |
row, row_report = row_processor(row, *context, **parameters_) | |
if row_report != nothing_to_report: | |
resource_report.update({index.row: row_report}) | |
yield row | |
processor_report.append(resource_report) | |
row_generator = process_rows(resource) | |
sample_rows = list(_get_sample_rows(row_generator, sample_size)) | |
yield itertools.chain(sample_rows, row_generator) | |
if verbose: | |
_write_to_log(parameters_, sample_rows, resource_index) | |
return process_resources(parameters), processor_report |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment