Skip to content

Instantly share code, notes, and snippets.

@brutus
Last active June 16, 2017 18:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brutus/be91cbd60ffe35cb7e100c334d964d8c to your computer and use it in GitHub Desktop.
Save brutus/be91cbd60ffe35cb7e100c334d964d8c to your computer and use it in GitHub Desktop.
Pipeline class to collect items using Scrapy
"""
This pipeline collects yielded items to process them after a crawl finishes.
Overview
========
For each yielded item :meth:`process_item` is called — and handles the storing
of items for later use. After all items are yielded, :meth:`close_spider` is
called and assembles the data from the previously collected items
(:meth:`assemble_collection`) and dumps the results (:meth:`dump_collection`).
.. note::
Care is taken, so that you can overwrite each step of the process and
customize it to your needs.
Item Collection
---------------
Yielded items that match given criteria (:meth:`is_item_for_storage`) are
collected in the :attr:`items` dictionary — for that a key and possible sub key
is generated for each item (:meth:`get_key`, :meth:`get_subkey`) and the item
is then cast to a store-able format (with :meth:`get_data`).
To control the way how data is collected, good places to overwrite are
:meth:`is_item_for_storage`, :meth:`get_key`, :meth:`get_subkey` and
:meth:`get_data`. But this might not be needed…
Building the Collection
-----------------------
To create the collection, :type:`assemble_data` is called for each key–value
pair in :attr:`items`. The result of that call is then send to
:attr:`verify_data` and if no exception is raised, the result from that call is
appended to :attr:`collection`.
So :meth:`assemble_data` probably is the first method you should overwrite to
customize your item collection: it gets called for each collected key with all
the info collected for it. It should return the data in any way you see fit for
the key.
You could also overwrite :meth:`verify_data` to verify and / or even change the
data returned from :meth:`assemble_data`. If you want to skip storing a result,
raise :exc:`ItemVerificationError` from here.
Data Structures
===============
This class uses two structures: :attr:`items` to collect the items as they come
in and :attr:`collection` that holds the output generated from the items.
Items
-----
The items are collected in a dictionary.
For each item a key is generated (:meth:`get_key`), that points to a list, that
collects the data (:meth:`get_data`) of every item with the same *key*::
items = {
'key-01': [
'item-data-01',
'item-data-02',
'item-data-03',
],
'key-02': [
'item-data-01',
],
'key-03': [
'item-data-01',
'item-data-02',
],
}
If a subkey is used (:meth:`get_subkey` returns something other than `None`)
this looks a little bit different::
items = {
'key-01': {
'subkey-01': [
'item-data-01',
'item-data-02',
],
'subkey-02': [
'item-data-01',
],
},
'key-02': {
'subkey-01': [
'item-data-01',
],
'subkey-02': [
'item-data-01',
],
},
'key-03': {
'subkey-02': [
'item-data-01',
'item-data-02',
],
},
}
Data List
~~~~~~~~~
One important part here is that the item data is always stored in a list, so
that multiple items with the same key / sub key won't interfere.
You can use the :func:`flatten` helper function to deal with this in your code.
Collection
----------
The collection is either a dictionary or a list, depending on the
:attr:`as_list` setting. If :attr:`as_list` is set, it is a list and the
results for each key are appended to it. Otherwise it is a dictionary and each
key points to the result for that key.
Setup
=====
1. Create a subclass from this (e.g. in your ``pipelines.py`` file)::
import collection_pipeline
class MyCollector(collection_pipeline.CollectItemsPipeline):
pass # overwrite stuff here
2. And set it in your ``settings.py`` file::
ITEM_PIPELINES = {
'mypackage.pipelines.MyCollector': 800,
}
3. You can also configure some things there::
ITEM_COLLECTION_FILENAME = 'my_collection.json'
ITEM_COLLECTION_DEL_KEY = True
"""
import json
import slugify
from scrapy.exceptions import DropItem
class ItemKeyError(Exception):
"""Raised if no key can be generated for an item."""
pass
class ItemVerificationError(Exception):
"""Raised if the verification for an item failed."""
pass
class CollectItemsPipeline(object):
"""
Collects items to handle them later.
For each yielded item :meth:`process_item` is called — and handles the
storing of items for later use.
After all items are yielded, :meth:`close_spider` is called and assembles
the data from the previously collected items (:meth:`assemble_collection`)
and dumps the results (:meth:`dump_collection`).
You should override :meth:`assemble_data` to customize your item
collection: it gets called for each collected key with all the info
collected for it. And it should return the data in any way you see fit for
the key.
You could also overwrite :meth:`verify_data` to verify and / or even
change the data returned from :meth:`assemble_data`. If you want to skip
storing a result, raise :exc:`ItemVerificationError` from here.
"""
@classmethod
def from_crawler(cls, crawler):
"""
Creates instance from *crawler* settings.
Here is a list of the supported settings and their default values — see
:meth:`init` for details::
ITEM_COLLECTION_FILENAME = 'data.json'
ITEM_COLLECTION_KEY_ATTR = 'collection_index'
ITEM_COLLECTION_SLUGIFY_KEYS = True
ITEM_COLLECTION_DEL_KEY = False
ITEM_COLLECTION_DROP_ITEMS = False
ITEM_COLLECTION_AS_LIST = False
ITEM_COLLECTION_RAISE_ON_ERROR = False
ITEM_COLLECTION_STATS_FOR_KEY = False
ITEM_COLLECTION_STATS_FOR_SUBKEY = False
"""
s = crawler.settings
filename = s.get('ITEM_COLLECTION_FILENAME', 'data.json')
key_attr = s.get('ITEM_COLLECTION_KEY_ATTR', 'collection_index')
slugify_keys = s.get('ITEM_COLLECTION_SLUGIFY_KEYS', True)
del_key = s.get('ITEM_COLLECTION_DEL_KEY', False)
drop_items = s.get('ITEM_COLLECTION_DROP_ITEMS', False)
as_list = s.get('ITEM_COLLECTION_AS_LIST', False)
raise_on_error = s.get('ITEM_COLLECTION_RAISE_ON_ERROR', False)
stats_for_key = s.get('ITEM_COLLECTION_STATS_FOR_KEY', False)
stats_for_subkey = s.get('ITEM_COLLECTION_STATS_FOR_SUBKEY', False)
return cls(
crawler.stats, filename, key_attr, slugify_keys, del_key,
drop_items, as_list, raise_on_error, stats_for_key,
stats_for_subkey
)
def __init__(
self, stats, filename, key_attr, slugify_keys=True, del_key=False,
drop_items=False, as_list=False, raise_on_error=False,
stats_for_key=False, stats_for_subkey=False
):
"""
Configures the instance and creates data structures for storage.
Args:
filename: the results will be dumped to this path / filename
key_attr: the attribute used for the generation of keys for items
slugify_keys: slugify the item keys
del_key: remove the key attribute from items
drop_items: drop stored items instead of returning them
as_list: use a list as collection instead of a dictionary
raise_on_error: raise exceptions on errors
stats_for_key: collect stats for item keys
stats_for_subkey: collect stats for item sub keys
"""
self.stats = stats
self.filename = filename
self.key_attr = key_attr
self.slugify_keys = slugify_keys
self.del_key = del_key
self.drop_items = drop_items
self.raise_on_error = raise_on_error
self.stats_for_key = stats_for_key
self.stats_for_subkey = stats_for_subkey
self.reset()
def reset(self):
"""
Creates data structures for storage.
Creates empty :attr:`items` dictionary and :attr:`collection` list.
"""
self.items = {}
self.collection = [] if self.as_list else {}
def process_item(self, item, spider):
"""
Stores and returns *item* depending on settings.
This method is called for each item in the pipeline. If the *item*
should not be stored, it is returned unchanged and immediately. If it
should be stored, it is put in the :attr:`items` dictionary under a
generated key. Stored items are returned too, as long as
:attr:`drop_items` is not set.
.. important::
If no *key* can be generated for an *item*, the item won't be
stored. Set :attr:`raise_on_error` if you want :exc:`ItemKeyError`
to be raised in that case.
Raises:
DropItem: for stored items if :attr:`drop_items` is set.
"""
# set current spider
self.spider = spider
# store item?
store_item = self.is_item_for_storage(item)
if store_item:
self.stats.inc_value('collected/included')
try:
self.store_item(item)
except ItemKeyError as e:
if self.raise_on_error:
raise e
else:
spider.logger.warning(e)
self.stats.inc_value('collected/errors')
self.stats.inc_value('collected/errors/missing_keys')
else:
self.stats.inc_value('collected/excluded')
# return or drop item?
if self.drop_items and store_item:
self.stats.inc_value('collected/dropped')
raise DropItem(f"stored <{type(item).__name__}>")
else:
self.stats.inc_value('collected/returned')
return item
def close_spider(self, spider):
"""
Assembles the collection from the stored items and dumps it.
After all items are collected, this method is called. It calls
:meth:`assemble_collection` to create :attr:`collection` from
:attr:`items` and then dumps, the result with :meth:`dump_collection`.
"""
# store current spider
self.spider = spider
# assemble collection
item_count = len(self.items)
self.spider.logger.info(
f"Assembling {item_count} items…"
)
self.assemble_collection(spider)
# dump collection
collection_count = len(self.collection)
self.spider.logger.info(
f"Writing {collection_count} entries to '{self.filename}'…"
)
self.dump_collection(spider)
def store_item(self, item):
"""
Stores *item* in :attr:`items`.
1. Generate key for the item with :meth:`get_key`.
2. Generate sub key for the item with :meth:`get_subkey`.
3. "Convert" the item to a store–able format with :meth:`get_data`.
4. Store the results under key / sub key in :attr:`items`.
5. Update statistics.
.. note::
If `None` is returned for a sub key, it is not used and the item is
stored directly under the generated key.
.. note::
Note that not item itself is stored, but whatever is returned for it
by :meth:`get_data`.
Raises:
ItemKeyError: if no key can be generated for *item*.
"""
# store item
key = self.get_key(item)
is_new_key = key not in self.items
subkey = self.get_subkey(item, key)
data = self.get_data(item, key, subkey)
self.store_data(data, key, subkey)
# update stats
self.stats.inc_value('collected/items')
if is_new_key:
self.stats.inc_value('collected/items/keys')
if self.stats_for_key:
self.stats.inc_value(f'collected/items/item/{key}')
if self.stats_for_subkey and subkey:
self.stats.inc_value(f'collected/items/item/{key}/{subkey}')
def store_data(self, data, key, subkey=None):
"""
Stores *data* in :attr:`items` under *key* / *subkey*.
.. note::
If *subkey* is `None`, it is not used and the item is stored
directly under *key*.
"""
if subkey:
try:
self.items[key][subkey].append(data)
except KeyError:
if key not in self.items:
self.items[key] = {}
self.items[key][subkey] = [data, ]
else:
try:
self.items[key].append(data)
except KeyError:
self.items[key] = [data, ]
def assemble_collection(self):
"""
Assembles :attr:`collection` from :attr:`items`.
All items in :attr:`items` are passed to :meth:`assemble_item` and the
verified results are stored in :attr:`collection`. If an
:exc:`ItemVerificationError` is raised, that item is skipped.
How the items are stored in :attr:`collection` depends on
:attr:`as_list`.
See :meth:`assemble_data` and :meth:`verify_data` for details.
"""
for key, data in self.items.items():
try:
data = self.assemble_data(data, key)
data = self.verify_data(data, key)
if self.as_list:
self.collection.append(data)
else:
try:
self.collection[key].append(data)
except KeyError:
self.collection[key] = [data, ]
self.stats.inc_value('collected/collection/stored')
except ItemVerificationError as e:
self.spider.logger.warning(e)
self.stats.inc_value('collected/errors')
self.stats.inc_value('collected/errors/verification')
self.stats.inc_value('collected/collection/skipped')
continue
def dump_collection(self):
"""
Dumps the contents of :attr:`collection` as JSON to :attr:`filename`.
"""
with open(self.filename, 'w') as fh:
json.dump(
self.collection, fh, indent=2, sort_keys=True,
ensure_ascii=False
)
def is_item_for_storage(self, item):
"""
Returns `True` if *item* should be stored, else `False`.
The default returns `True` if :attr:`key_attr` is in *item*.
"""
return self.key_attr in item
def get_key(self, item):
"""
Returns the key for *item*.
Will use the value of the attribute of *item* referred by
:attr:`key_attr`. If :attr:`slugify_keys` is set, the value will be
"slugified".
Raises:
ItemKeyError: if no key can be generated for the *item*.
"""
try:
key = item[self.key_attr]
if self.slugify_keys:
key = slugify.slugify(key)
return key
except KeyError:
raise ItemKeyError(
f"item has no attribute named: '{self.key_attr}': {item}"
)
def get_subkey(self, item, key):
"""
Returns the sub key for the *item* with *key*.
Will use the slugified class name of *item*. If :attr:`slugify_keys` is
set, the value will be "slugified".
.. note::
If `None` is returned as a sub key, it is not used and the item is
stored directly under the key.
"""
key = type(item).__name__
if self.slugify_keys:
key = slugify.slugify(key)
return key
def get_data(self, item, key, subkey):
"""
Returns *item* in a way so it is fit for storage.
Will cast *item* to a dictionary. If :attr:`del_key` is set,
:attr:`key_attr` is deleted from the resulting dictionary.
"""
data = dict(item) if not isinstance(item, dict) else item
if self.del_key:
del(data[self.key_attr])
return data
def assemble_data(self, data, key):
"""
Returns the assembling results from *data* for *key*.
*data* will be the contents of key — so either a dictionary with all
the sub *keys pointing to the collected data or the stored data
directly if no *sub keys are used.
This default implementation just returns *data* unchanged.
"""
keys = ', '.join(sorted(data.keys()))
self.spider.logger.debug(f"assembling '{key}' with {keys}.")
return data
def verify_data(self, data, key):
"""
Returns cleaned *data* for *key*.
This default implementation just returns *data* unchanged.
Raises:
ItemVerificationError: on verification errors.
"""
self.spider.logger.debug(f"verifying '{key}'")
return data
def flatten(source_list, key=None):
"""
Returns the first element from *source_list*.
This is a helper function to get elements from the stored items. If the
*source_list* contains more than one item, all the items need to be the
same. You can use this as a shortcut for cases, where you don't expect
data from multiple values for your items.
If *source_list* is a list of dictionaries, you can filter it further: If
*key* is set, only the given *key* is considered.
Raises:
ItemVerificationError: if no elements (have the given *key*).
ItemVerificationError: if not all elements are the same.
Examples
--------
For this to work, a list needs to have at least one element::
>>> flatten([1, ])
1
Otherwise :exc:`ItemVerificationError` is raised::
>>> flatten([]) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ItemVerificationError: no item in source: []
You can catch it to set a default or something. Anyway, multiple values
are okay, as long as they are equal::
>>> flatten([1, 1, 1])
1
But :exc:`ItemVerificationError` will be raised, if they are different::
>>> flatten([1, 2, 3]) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ItemVerificationError: unequal values: [1, 2, 3]
This works the same with a list of dictionaries::
>>> d1 = d2 = dict(foo='test 1 foo', bar='test 1 bar')
>>> flatten([d1, d2])
{'foo': 'test 1 foo', 'bar': 'test 1 bar'}
And also only if they are the same::
>>> d2 = dict(foo='test 2 foo', bar='test 2 bar')
>>> flatten([d1, d2]) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ItemVerificationError: unequal values: ...
And you can also filter on dictionary keys::
>>> d1 = d2 = dict(foo='test 1 foo', bar='test 1 bar')
>>> flatten([d1, d2], key='foo')
'test 1 foo'
>>> flatten([d1, d2], key='bar')
'test 1 bar'
>>> flatten([d1, d2], key='x') # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ItemVerificationError: key 'x' not found ...
But you can't filter on key with none dictionaries::
>>> flatten([1, 1, 1], key='foo') # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
ItemVerificationError: ...
"""
assert isinstance(source_list, list)
# get all *key* elements
if key is None:
values = source_list
err_msg_empty = f"no item in source: {source_list}"
err_msg_multi = f"unequal values: {source_list}"
else:
try:
values = [d[key] for d in source_list if key in d]
except TypeError:
err_msg_key = 'you can only filter by key in dictionaries'
raise ItemVerificationError(err_msg_key)
err_msg_empty = f"key '{key}' not found in source: {source_list}"
err_msg_multi = f"multiple unequal values for '{key}': {values}"
# return first element
try:
if (values.count(values[0]) == len(values)):
return values[0]
else:
raise ItemVerificationError(err_msg_multi)
except IndexError:
raise ItemVerificationError(err_msg_empty)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment