Created
November 21, 2016 11:54
-
-
Save cyberbikepunk/2a63c656c96311ad3601945c768cbc79 to your computer and use it in GitHub Desktop.
A Processor class for the datapackage-pipelines framework (halfway done)..
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""A boilerplate class for row processing. | |
Purposes | |
-------- | |
The `Processor` class fulfills 2 purposes: | |
1. Provide boilerplate functionality | |
- Log the processor parameters | |
- Force the iteration over a given sample size | |
- Log a sample output of the processor | |
- Handle a chosen set of exceptions | |
2. Pass context to the row processor | |
- Passes row and resource indices to the processor | |
- Manages parameters defaults | |
- Passes parameters to the row processor | |
- Add row and resource in | |
Usage | |
----- | |
>>> resources = [[{'foo': '1'}], [{'foo': '2'}]] | |
>>> processor = Processor(function=int) | |
>>> new_resources = processor.process(resources) | |
>>> assert next(next(new_resources)) == 1 | |
>>> assert next(next(new_resources)) == 2 | |
""" | |
from copy import copy | |
from logging import info, warning | |
from petl import fromdicts, look | |
from common.config import LOG_SAMPLE_SIZE | |
from common.utilities import format_to_json | |
class Processor(object): | |
"""A boilerplate class for row processing.""" | |
def __init__(self, | |
function=lambda x: x, | |
datapackage=None, | |
enumerate_resources=False, | |
enumerate_rows=False, | |
dump_sample=True, | |
sample_size=LOG_SAMPLE_SIZE, | |
exceptions=None, | |
**default_parameters): | |
self.function = function | |
self.datapackage = datapackage | |
self.enumerate_rows = enumerate_rows | |
self.enumerate_resources = enumerate_resources | |
self.dump_sample = dump_sample | |
self.sample_size = sample_size | |
self.exceptions = tuple(exceptions) | |
self.default_parameters = default_parameters | |
self.parameter_keys = list(self.default_parameters.keys()) | |
self.parameters = [] | |
self._check_arguments() | |
self._configure_parameters() | |
self._log_parameters() | |
self._input_resources = None | |
self._input_resource = None | |
self._output_resource = None | |
self._resource_index = None | |
self._row = None | |
self._row_index = None | |
self._sample_input_rows = [] | |
self._sample_output_rows = [] | |
self._stats = {} | |
def _check_arguments(self): | |
if self.default_parameters: | |
assert self.datapackage, 'Parameters passed but no datapackage' | |
def _configure_parameters(self): | |
for resource in self.datapackage['resources']: | |
parameters = {} | |
parameters.update(**self.default_parameters) | |
parameters.update(**self._read_datapackage_parameters()) | |
parameters.update(**self._read_resource_parameters(resource)) | |
self.parameters.append(parameters) | |
def _log_parameters(self): | |
info('%s parameters =\n%s', self, format_to_json(self.parameters)) | |
@property | |
def _context(self): | |
context = copy(self.parameters)[self._resource_index] | |
if self.enumerate_resources: | |
context.update(resource_index=self._resource_index) | |
if self.enumerate_rows: | |
context.update(row_index=self._row_index) | |
return context | |
def _read_resource_parameters(self, resource): | |
return {key: value | |
for key, value in resource.items() | |
if key in self.parameter_keys} | |
def _read_datapackage_parameters(self): | |
return {key: value | |
for key, value in self.datapackage.items() | |
if key in self.parameter_keys} | |
def _log_sample(self): | |
petl_table = fromdicts(self._sample_output_rows) | |
table_view = look(petl_table, limit=self.sample_size) | |
message = 'Sample output of %s (resource %s) =\n%s' | |
info(message, self, self._resource_index, table_view) | |
def _extract_sample(self): | |
"""Extract sample rows out of the resource.""" | |
for i, row in enumerate(self._output_resource): | |
if i < self.sample_size: | |
self._sample_output_rows.append(row) | |
def process(self, resources): | |
"""Apply the processor to each row of each resource.""" | |
for self._resource_index, self._input_resource in enumerate(resources): | |
yield self._process_rows() | |
def _process_rows(self): | |
for self._row_index, row in enumerate(self._input_resource): | |
try: | |
new_row, self._stat = self.function(row, **self._context) | |
except self.exceptions as error: | |
self._warn_failed_row(error) | |
self._stat = [] | |
def _warn_failed_row(self, error): | |
message = 'Failed processing resource %w row %s (%s)' | |
warning(message, self._resource_index, self._row_index, error) | |
def | |
def __str__(self): | |
return self.function.__name__ | |
def __repr__(self): | |
return '<Processor: {}>'.format(self) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment