Skip to content

Instantly share code, notes, and snippets.

@cyberbikepunk
Created November 21, 2016 11:54
Show Gist options
  • Save cyberbikepunk/2a63c656c96311ad3601945c768cbc79 to your computer and use it in GitHub Desktop.
Save cyberbikepunk/2a63c656c96311ad3601945c768cbc79 to your computer and use it in GitHub Desktop.
A Processor class for the datapackage-pipelines framework (halfway done)..
"""A boilerplate class for row processing.
Purposes
--------
The `Processor` class fulfills 2 purposes:
1. Provide boilerplate functionality
- Log the processor parameters
- Force the iteration over a given sample size
- Log a sample output of the processor
- Handle a chosen set of exceptions
2. Pass context to the row processor
- Passes row and resource indices to the processor
- Manages parameters defaults
- Passes parameters to the row processor
- Add row and resource in
Usage
-----
>>> resources = [[{'foo': '1'}], [{'foo': '2'}]]
>>> processor = Processor(function=int)
>>> new_resources = processor.process(resources)
>>> assert next(next(new_resources)) == 1
>>> assert next(next(new_resources)) == 2
"""
from copy import copy
from logging import info, warning
from petl import fromdicts, look
from common.config import LOG_SAMPLE_SIZE
from common.utilities import format_to_json
class Processor(object):
"""A boilerplate class for row processing."""
def __init__(self,
function=lambda x: x,
datapackage=None,
enumerate_resources=False,
enumerate_rows=False,
dump_sample=True,
sample_size=LOG_SAMPLE_SIZE,
exceptions=None,
**default_parameters):
self.function = function
self.datapackage = datapackage
self.enumerate_rows = enumerate_rows
self.enumerate_resources = enumerate_resources
self.dump_sample = dump_sample
self.sample_size = sample_size
self.exceptions = tuple(exceptions)
self.default_parameters = default_parameters
self.parameter_keys = list(self.default_parameters.keys())
self.parameters = []
self._check_arguments()
self._configure_parameters()
self._log_parameters()
self._input_resources = None
self._input_resource = None
self._output_resource = None
self._resource_index = None
self._row = None
self._row_index = None
self._sample_input_rows = []
self._sample_output_rows = []
self._stats = {}
def _check_arguments(self):
if self.default_parameters:
assert self.datapackage, 'Parameters passed but no datapackage'
def _configure_parameters(self):
for resource in self.datapackage['resources']:
parameters = {}
parameters.update(**self.default_parameters)
parameters.update(**self._read_datapackage_parameters())
parameters.update(**self._read_resource_parameters(resource))
self.parameters.append(parameters)
def _log_parameters(self):
info('%s parameters =\n%s', self, format_to_json(self.parameters))
@property
def _context(self):
context = copy(self.parameters)[self._resource_index]
if self.enumerate_resources:
context.update(resource_index=self._resource_index)
if self.enumerate_rows:
context.update(row_index=self._row_index)
return context
def _read_resource_parameters(self, resource):
return {key: value
for key, value in resource.items()
if key in self.parameter_keys}
def _read_datapackage_parameters(self):
return {key: value
for key, value in self.datapackage.items()
if key in self.parameter_keys}
def _log_sample(self):
petl_table = fromdicts(self._sample_output_rows)
table_view = look(petl_table, limit=self.sample_size)
message = 'Sample output of %s (resource %s) =\n%s'
info(message, self, self._resource_index, table_view)
def _extract_sample(self):
"""Extract sample rows out of the resource."""
for i, row in enumerate(self._output_resource):
if i < self.sample_size:
self._sample_output_rows.append(row)
def process(self, resources):
"""Apply the processor to each row of each resource."""
for self._resource_index, self._input_resource in enumerate(resources):
yield self._process_rows()
def _process_rows(self):
for self._row_index, row in enumerate(self._input_resource):
try:
new_row, self._stat = self.function(row, **self._context)
except self.exceptions as error:
self._warn_failed_row(error)
self._stat = []
def _warn_failed_row(self, error):
message = 'Failed processing resource %w row %s (%s)'
warning(message, self._resource_index, self._row_index, error)
def
def __str__(self):
return self.function.__name__
def __repr__(self):
return '<Processor: {}>'.format(self)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment