Last active
June 16, 2017 18:06
-
-
Save brutus/be91cbd60ffe35cb7e100c334d964d8c to your computer and use it in GitHub Desktop.
Pipeline class to collect items using Scrapy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This pipeline collects yielded items to process them after a crawl finishes. | |
Overview | |
======== | |
For each yielded item :meth:`process_item` is called — and handles the storing | |
of items for later use. After all items are yielded, :meth:`close_spider` is | |
called and assembles the data from the previously collected items | |
(:meth:`assemble_collection`) and dumps the results (:meth:`dump_collection`). | |
.. note:: | |
Care is taken, so that you can overwrite each step of the process and | |
customize it to your needs. | |
Item Collection | |
--------------- | |
Yielded items that match given criteria (:meth:`is_item_for_storage`) are | |
collected in the :attr:`items` dictionary — for that a key and possible sub key | |
is generated for each item (:meth:`get_key`, :meth:`get_subkey`) and the item | |
is then cast to a store-able format (with :meth:`get_data`). | |
To control the way how data is collected, good places to overwrite are | |
:meth:`is_item_for_storage`, :meth:`get_key`, :meth:`get_subkey` and | |
:meth:`get_data`. But this might not be needed… | |
Building the Collection | |
----------------------- | |
To create the collection, :type:`assemble_data` is called for each key–value | |
pair in :attr:`items`. The result of that call is then send to | |
:attr:`verify_data` and if no exception is raised, the result from that call is | |
appended to :attr:`collection`. | |
So :meth:`assemble_data` probably is the first method you should overwrite to | |
customize your item collection: it gets called for each collected key with all | |
the info collected for it. It should return the data in any way you see fit for | |
the key. | |
You could also overwrite :meth:`verify_data` to verify and / or even change the | |
data returned from :meth:`assemble_data`. If you want to skip storing a result, | |
raise :exc:`ItemVerificationError` from here. | |
Data Structures | |
=============== | |
This class uses two structures: :attr:`items` to collect the items as they come | |
in and :attr:`collection` that holds the output generated from the items. | |
Items | |
----- | |
The items are collected in a dictionary. | |
For each item a key is generated (:meth:`get_key`), that points to a list, that | |
collects the data (:meth:`get_data`) of every item with the same *key*:: | |
items = { | |
'key-01': [ | |
'item-data-01', | |
'item-data-02', | |
'item-data-03', | |
], | |
'key-02': [ | |
'item-data-01', | |
], | |
'key-03': [ | |
'item-data-01', | |
'item-data-02', | |
], | |
} | |
If a subkey is used (:meth:`get_subkey` returns something other than `None`) | |
this looks a little bit different:: | |
items = { | |
'key-01': { | |
'subkey-01': [ | |
'item-data-01', | |
'item-data-02', | |
], | |
'subkey-02': [ | |
'item-data-01', | |
], | |
}, | |
'key-02': { | |
'subkey-01': [ | |
'item-data-01', | |
], | |
'subkey-02': [ | |
'item-data-01', | |
], | |
}, | |
'key-03': { | |
'subkey-02': [ | |
'item-data-01', | |
'item-data-02', | |
], | |
}, | |
} | |
Data List | |
~~~~~~~~~ | |
One important part here is that the item data is always stored in a list, so | |
that multiple items with the same key / sub key won't interfere. | |
You can use the :func:`flatten` helper function to deal with this in your code. | |
Collection | |
---------- | |
The collection is either a dictionary or a list, depending on the | |
:attr:`as_list` setting. If :attr:`as_list` is set, it is a list and the | |
results for each key are appended to it. Otherwise it is a dictionary and each | |
key points to the result for that key. | |
Setup | |
===== | |
1. Create a subclass from this (e.g. in your ``pipelines.py`` file):: | |
import collection_pipeline | |
class MyCollector(collection_pipeline.CollectItemsPipeline): | |
pass # overwrite stuff here | |
2. And set it in your ``settings.py`` file:: | |
ITEM_PIPELINES = { | |
'mypackage.pipelines.MyCollector': 800, | |
} | |
3. You can also configure some things there:: | |
ITEM_COLLECTION_FILENAME = 'my_collection.json' | |
ITEM_COLLECTION_DEL_KEY = True | |
""" | |
import json | |
import slugify | |
from scrapy.exceptions import DropItem | |
class ItemKeyError(Exception): | |
"""Raised if no key can be generated for an item.""" | |
pass | |
class ItemVerificationError(Exception): | |
"""Raised if the verification for an item failed.""" | |
pass | |
class CollectItemsPipeline(object): | |
""" | |
Collects items to handle them later. | |
For each yielded item :meth:`process_item` is called — and handles the | |
storing of items for later use. | |
After all items are yielded, :meth:`close_spider` is called and assembles | |
the data from the previously collected items (:meth:`assemble_collection`) | |
and dumps the results (:meth:`dump_collection`). | |
You should override :meth:`assemble_data` to customize your item | |
collection: it gets called for each collected key with all the info | |
collected for it. And it should return the data in any way you see fit for | |
the key. | |
You could also overwrite :meth:`verify_data` to verify and / or even | |
change the data returned from :meth:`assemble_data`. If you want to skip | |
storing a result, raise :exc:`ItemVerificationError` from here. | |
""" | |
@classmethod | |
def from_crawler(cls, crawler): | |
""" | |
Creates instance from *crawler* settings. | |
Here is a list of the supported settings and their default values — see | |
:meth:`init` for details:: | |
ITEM_COLLECTION_FILENAME = 'data.json' | |
ITEM_COLLECTION_KEY_ATTR = 'collection_index' | |
ITEM_COLLECTION_SLUGIFY_KEYS = True | |
ITEM_COLLECTION_DEL_KEY = False | |
ITEM_COLLECTION_DROP_ITEMS = False | |
ITEM_COLLECTION_AS_LIST = False | |
ITEM_COLLECTION_RAISE_ON_ERROR = False | |
ITEM_COLLECTION_STATS_FOR_KEY = False | |
ITEM_COLLECTION_STATS_FOR_SUBKEY = False | |
""" | |
s = crawler.settings | |
filename = s.get('ITEM_COLLECTION_FILENAME', 'data.json') | |
key_attr = s.get('ITEM_COLLECTION_KEY_ATTR', 'collection_index') | |
slugify_keys = s.get('ITEM_COLLECTION_SLUGIFY_KEYS', True) | |
del_key = s.get('ITEM_COLLECTION_DEL_KEY', False) | |
drop_items = s.get('ITEM_COLLECTION_DROP_ITEMS', False) | |
as_list = s.get('ITEM_COLLECTION_AS_LIST', False) | |
raise_on_error = s.get('ITEM_COLLECTION_RAISE_ON_ERROR', False) | |
stats_for_key = s.get('ITEM_COLLECTION_STATS_FOR_KEY', False) | |
stats_for_subkey = s.get('ITEM_COLLECTION_STATS_FOR_SUBKEY', False) | |
return cls( | |
crawler.stats, filename, key_attr, slugify_keys, del_key, | |
drop_items, as_list, raise_on_error, stats_for_key, | |
stats_for_subkey | |
) | |
def __init__( | |
self, stats, filename, key_attr, slugify_keys=True, del_key=False, | |
drop_items=False, as_list=False, raise_on_error=False, | |
stats_for_key=False, stats_for_subkey=False | |
): | |
""" | |
Configures the instance and creates data structures for storage. | |
Args: | |
filename: the results will be dumped to this path / filename | |
key_attr: the attribute used for the generation of keys for items | |
slugify_keys: slugify the item keys | |
del_key: remove the key attribute from items | |
drop_items: drop stored items instead of returning them | |
as_list: use a list as collection instead of a dictionary | |
raise_on_error: raise exceptions on errors | |
stats_for_key: collect stats for item keys | |
stats_for_subkey: collect stats for item sub keys | |
""" | |
self.stats = stats | |
self.filename = filename | |
self.key_attr = key_attr | |
self.slugify_keys = slugify_keys | |
self.del_key = del_key | |
self.drop_items = drop_items | |
self.raise_on_error = raise_on_error | |
self.stats_for_key = stats_for_key | |
self.stats_for_subkey = stats_for_subkey | |
self.reset() | |
def reset(self): | |
""" | |
Creates data structures for storage. | |
Creates empty :attr:`items` dictionary and :attr:`collection` list. | |
""" | |
self.items = {} | |
self.collection = [] if self.as_list else {} | |
def process_item(self, item, spider): | |
""" | |
Stores and returns *item* depending on settings. | |
This method is called for each item in the pipeline. If the *item* | |
should not be stored, it is returned unchanged and immediately. If it | |
should be stored, it is put in the :attr:`items` dictionary under a | |
generated key. Stored items are returned too, as long as | |
:attr:`drop_items` is not set. | |
.. important:: | |
If no *key* can be generated for an *item*, the item won't be | |
stored. Set :attr:`raise_on_error` if you want :exc:`ItemKeyError` | |
to be raised in that case. | |
Raises: | |
DropItem: for stored items if :attr:`drop_items` is set. | |
""" | |
# set current spider | |
self.spider = spider | |
# store item? | |
store_item = self.is_item_for_storage(item) | |
if store_item: | |
self.stats.inc_value('collected/included') | |
try: | |
self.store_item(item) | |
except ItemKeyError as e: | |
if self.raise_on_error: | |
raise e | |
else: | |
spider.logger.warning(e) | |
self.stats.inc_value('collected/errors') | |
self.stats.inc_value('collected/errors/missing_keys') | |
else: | |
self.stats.inc_value('collected/excluded') | |
# return or drop item? | |
if self.drop_items and store_item: | |
self.stats.inc_value('collected/dropped') | |
raise DropItem(f"stored <{type(item).__name__}>") | |
else: | |
self.stats.inc_value('collected/returned') | |
return item | |
def close_spider(self, spider): | |
""" | |
Assembles the collection from the stored items and dumps it. | |
After all items are collected, this method is called. It calls | |
:meth:`assemble_collection` to create :attr:`collection` from | |
:attr:`items` and then dumps, the result with :meth:`dump_collection`. | |
""" | |
# store current spider | |
self.spider = spider | |
# assemble collection | |
item_count = len(self.items) | |
self.spider.logger.info( | |
f"Assembling {item_count} items…" | |
) | |
self.assemble_collection(spider) | |
# dump collection | |
collection_count = len(self.collection) | |
self.spider.logger.info( | |
f"Writing {collection_count} entries to '{self.filename}'…" | |
) | |
self.dump_collection(spider) | |
def store_item(self, item): | |
""" | |
Stores *item* in :attr:`items`. | |
1. Generate key for the item with :meth:`get_key`. | |
2. Generate sub key for the item with :meth:`get_subkey`. | |
3. "Convert" the item to a store–able format with :meth:`get_data`. | |
4. Store the results under key / sub key in :attr:`items`. | |
5. Update statistics. | |
.. note:: | |
If `None` is returned for a sub key, it is not used and the item is | |
stored directly under the generated key. | |
.. note:: | |
Note that not item itself is stored, but whatever is returned for it | |
by :meth:`get_data`. | |
Raises: | |
ItemKeyError: if no key can be generated for *item*. | |
""" | |
# store item | |
key = self.get_key(item) | |
is_new_key = key not in self.items | |
subkey = self.get_subkey(item, key) | |
data = self.get_data(item, key, subkey) | |
self.store_data(data, key, subkey) | |
# update stats | |
self.stats.inc_value('collected/items') | |
if is_new_key: | |
self.stats.inc_value('collected/items/keys') | |
if self.stats_for_key: | |
self.stats.inc_value(f'collected/items/item/{key}') | |
if self.stats_for_subkey and subkey: | |
self.stats.inc_value(f'collected/items/item/{key}/{subkey}') | |
def store_data(self, data, key, subkey=None): | |
""" | |
Stores *data* in :attr:`items` under *key* / *subkey*. | |
.. note:: | |
If *subkey* is `None`, it is not used and the item is stored | |
directly under *key*. | |
""" | |
if subkey: | |
try: | |
self.items[key][subkey].append(data) | |
except KeyError: | |
if key not in self.items: | |
self.items[key] = {} | |
self.items[key][subkey] = [data, ] | |
else: | |
try: | |
self.items[key].append(data) | |
except KeyError: | |
self.items[key] = [data, ] | |
def assemble_collection(self): | |
""" | |
Assembles :attr:`collection` from :attr:`items`. | |
All items in :attr:`items` are passed to :meth:`assemble_item` and the | |
verified results are stored in :attr:`collection`. If an | |
:exc:`ItemVerificationError` is raised, that item is skipped. | |
How the items are stored in :attr:`collection` depends on | |
:attr:`as_list`. | |
See :meth:`assemble_data` and :meth:`verify_data` for details. | |
""" | |
for key, data in self.items.items(): | |
try: | |
data = self.assemble_data(data, key) | |
data = self.verify_data(data, key) | |
if self.as_list: | |
self.collection.append(data) | |
else: | |
try: | |
self.collection[key].append(data) | |
except KeyError: | |
self.collection[key] = [data, ] | |
self.stats.inc_value('collected/collection/stored') | |
except ItemVerificationError as e: | |
self.spider.logger.warning(e) | |
self.stats.inc_value('collected/errors') | |
self.stats.inc_value('collected/errors/verification') | |
self.stats.inc_value('collected/collection/skipped') | |
continue | |
def dump_collection(self): | |
""" | |
Dumps the contents of :attr:`collection` as JSON to :attr:`filename`. | |
""" | |
with open(self.filename, 'w') as fh: | |
json.dump( | |
self.collection, fh, indent=2, sort_keys=True, | |
ensure_ascii=False | |
) | |
def is_item_for_storage(self, item): | |
""" | |
Returns `True` if *item* should be stored, else `False`. | |
The default returns `True` if :attr:`key_attr` is in *item*. | |
""" | |
return self.key_attr in item | |
def get_key(self, item): | |
""" | |
Returns the key for *item*. | |
Will use the value of the attribute of *item* referred by | |
:attr:`key_attr`. If :attr:`slugify_keys` is set, the value will be | |
"slugified". | |
Raises: | |
ItemKeyError: if no key can be generated for the *item*. | |
""" | |
try: | |
key = item[self.key_attr] | |
if self.slugify_keys: | |
key = slugify.slugify(key) | |
return key | |
except KeyError: | |
raise ItemKeyError( | |
f"item has no attribute named: '{self.key_attr}': {item}" | |
) | |
def get_subkey(self, item, key): | |
""" | |
Returns the sub key for the *item* with *key*. | |
Will use the slugified class name of *item*. If :attr:`slugify_keys` is | |
set, the value will be "slugified". | |
.. note:: | |
If `None` is returned as a sub key, it is not used and the item is | |
stored directly under the key. | |
""" | |
key = type(item).__name__ | |
if self.slugify_keys: | |
key = slugify.slugify(key) | |
return key | |
def get_data(self, item, key, subkey): | |
""" | |
Returns *item* in a way so it is fit for storage. | |
Will cast *item* to a dictionary. If :attr:`del_key` is set, | |
:attr:`key_attr` is deleted from the resulting dictionary. | |
""" | |
data = dict(item) if not isinstance(item, dict) else item | |
if self.del_key: | |
del(data[self.key_attr]) | |
return data | |
def assemble_data(self, data, key): | |
""" | |
Returns the assembling results from *data* for *key*. | |
*data* will be the contents of key — so either a dictionary with all | |
the sub *keys pointing to the collected data or the stored data | |
directly if no *sub keys are used. | |
This default implementation just returns *data* unchanged. | |
""" | |
keys = ', '.join(sorted(data.keys())) | |
self.spider.logger.debug(f"assembling '{key}' with {keys}.") | |
return data | |
def verify_data(self, data, key): | |
""" | |
Returns cleaned *data* for *key*. | |
This default implementation just returns *data* unchanged. | |
Raises: | |
ItemVerificationError: on verification errors. | |
""" | |
self.spider.logger.debug(f"verifying '{key}'") | |
return data | |
def flatten(source_list, key=None): | |
""" | |
Returns the first element from *source_list*. | |
This is a helper function to get elements from the stored items. If the | |
*source_list* contains more than one item, all the items need to be the | |
same. You can use this as a shortcut for cases, where you don't expect | |
data from multiple values for your items. | |
If *source_list* is a list of dictionaries, you can filter it further: If | |
*key* is set, only the given *key* is considered. | |
Raises: | |
ItemVerificationError: if no elements (have the given *key*). | |
ItemVerificationError: if not all elements are the same. | |
Examples | |
-------- | |
For this to work, a list needs to have at least one element:: | |
>>> flatten([1, ]) | |
1 | |
Otherwise :exc:`ItemVerificationError` is raised:: | |
>>> flatten([]) # doctest: +IGNORE_EXCEPTION_DETAIL | |
Traceback (most recent call last): | |
ItemVerificationError: no item in source: [] | |
You can catch it to set a default or something. Anyway, multiple values | |
are okay, as long as they are equal:: | |
>>> flatten([1, 1, 1]) | |
1 | |
But :exc:`ItemVerificationError` will be raised, if they are different:: | |
>>> flatten([1, 2, 3]) # doctest: +IGNORE_EXCEPTION_DETAIL | |
Traceback (most recent call last): | |
ItemVerificationError: unequal values: [1, 2, 3] | |
This works the same with a list of dictionaries:: | |
>>> d1 = d2 = dict(foo='test 1 foo', bar='test 1 bar') | |
>>> flatten([d1, d2]) | |
{'foo': 'test 1 foo', 'bar': 'test 1 bar'} | |
And also only if they are the same:: | |
>>> d2 = dict(foo='test 2 foo', bar='test 2 bar') | |
>>> flatten([d1, d2]) # doctest: +IGNORE_EXCEPTION_DETAIL | |
Traceback (most recent call last): | |
ItemVerificationError: unequal values: ... | |
And you can also filter on dictionary keys:: | |
>>> d1 = d2 = dict(foo='test 1 foo', bar='test 1 bar') | |
>>> flatten([d1, d2], key='foo') | |
'test 1 foo' | |
>>> flatten([d1, d2], key='bar') | |
'test 1 bar' | |
>>> flatten([d1, d2], key='x') # doctest: +IGNORE_EXCEPTION_DETAIL | |
Traceback (most recent call last): | |
ItemVerificationError: key 'x' not found ... | |
But you can't filter on key with none dictionaries:: | |
>>> flatten([1, 1, 1], key='foo') # doctest: +IGNORE_EXCEPTION_DETAIL | |
Traceback (most recent call last): | |
ItemVerificationError: ... | |
""" | |
assert isinstance(source_list, list) | |
# get all *key* elements | |
if key is None: | |
values = source_list | |
err_msg_empty = f"no item in source: {source_list}" | |
err_msg_multi = f"unequal values: {source_list}" | |
else: | |
try: | |
values = [d[key] for d in source_list if key in d] | |
except TypeError: | |
err_msg_key = 'you can only filter by key in dictionaries' | |
raise ItemVerificationError(err_msg_key) | |
err_msg_empty = f"key '{key}' not found in source: {source_list}" | |
err_msg_multi = f"multiple unequal values for '{key}': {values}" | |
# return first element | |
try: | |
if (values.count(values[0]) == len(values)): | |
return values[0] | |
else: | |
raise ItemVerificationError(err_msg_multi) | |
except IndexError: | |
raise ItemVerificationError(err_msg_empty) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment