Skip to content

Instantly share code, notes, and snippets.

@iAnanich
Created February 1, 2018 14:39
Show Gist options
  • Save iAnanich/faf73a2ae6846acc3f60d8c3ba402471 to your computer and use it in GitHub Desktop.
Save iAnanich/faf73a2ae6846acc3f60d8c3ba402471 to your computer and use it in GitHub Desktop.
ScrapingHub tool kit
from .constants import *
from .funcs import *
from .manager import ScrapinghubManager, ManagerDefaults
from .fetcher import SHubFetcher
from .job import JobKey, JobSummary

This gist presents part of my Scrapy NTK library: scraping_hub package.

Especialy this one might help you to communicate with ScrapingHub API. It provides set of classes build for fetching items from cloud.

Note: some of the imported modules are not included, but you can find them in my library on ni list of my gists.

import re
__all__ = (
'JOBKEY_SEPARATOR', 'JOBKEY_PATTERN',
'META_STATE', 'META_STATE_FINISHED',
'META_CLOSE_REASON', 'META_CLOSE_REASON_FINISHED',
'META', 'META_KEY', 'META_ITEMS', 'META_SPIDER',
)
JOBKEY_SEPARATOR = '/'
JOBKEY_PATTERN = re.compile('\d+{sep}\d+{sep}\d+'.format(sep=JOBKEY_SEPARATOR))
# ============
# meta
# ============
META = 'meta'
META_KEY = 'key'
META_ITEMS = 'items'
META_SPIDER = 'spider'
META_STATE = 'state'
META_STATE_FINISHED = 'finished'
META_CLOSE_REASON = 'close_reason'
META_CLOSE_REASON_FINISHED = 'finished'
import logging
from typing import Iterator, Iterable, Tuple, Dict, List, Union
from functools import partial
from scrapinghub import ScrapinghubClient as Client
from scrapinghub.client.jobs import Job
from scrapinghub.client.projects import Project
from scrapinghub.client.spiders import Spider
from .constants import (
META_KEY, META_ITEMS, META,
META_CLOSE_REASON_FINISHED, META_CLOSE_REASON,
META_STATE, META_STATE_FINISHED,
)
from .funcs import spider_id_to_name
from .manager import ScrapinghubManager
from .job import JobKey, JobSummary
from ..utils.counter import Threshold
from ..utils.iter_manager import IterManager, BaseContext
JobNumIter = Iterator[int]
JobKeyIter = Iterator[str]
JobIter = Iterator[Job]
ItemIter = Iterator[dict]
LogIter = Iterator[dict]
SettingsInputType = Dict[
str, # API key
Dict[
int, # Project ID
Dict[
Union[str, int], # Spider name or ID
Iterable[int], # Iterable over excluded job numbers
]
]
]
SpidersTuple = Tuple[
Tuple[Spider, JobNumIter]
]
ProjectsTuple = Tuple[
Tuple[Project, SpidersTuple]
]
ProcessedSettingsType = ClientsTuple = Tuple[
Tuple[Client, ProjectsTuple]
]
class SHubFetcher:
def __init__(self, settings: SettingsInputType, *,
maximum_fetched_jobs: int or None =None,
maximum_excluded_matches: int or None =None,
maximum_returned_jobs: int or None =None,
maximum_total_excluded: int or None =None,
logger: logging.Logger=None):
"""
For example you have `1234567887654321123567887654321` API key, `274629`
project ID and `spider001` spider with `1` ID:
>>> f = SHubFetcher(
... settings={
... 'your_32_char_API_key': {
... 274629: {
... 'spider_number_1': (x for x in [305, 301, 300]),
... }
... }
... },
... maximum_excluded_matches=2, )
>>> f.fetch_jobs()
:param settings: see `SettingsInputType`
:param maximum_excluded_matches: how many job's numbers (last digit from
job key) from exclude must be matched to stop iteration
"""
if logger is None:
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
self.logger = logger
try:
# it will check their values
Threshold(maximum_fetched_jobs)
Threshold(maximum_excluded_matches)
except TypeError as exc:
msg = f'Wrong `maximum_*` type: {str(exc)}'
self.logger.exception(msg)
raise TypeError(msg) from None
self.maximum_excluded_matches = maximum_excluded_matches
self.maximum_fetched_jobs = maximum_fetched_jobs
self.maximum_returned_jobs = maximum_returned_jobs
self.maximum_total_excluded = maximum_total_excluded
self.settings = self.process_settings(settings)
@classmethod
def from_shub_defaults(cls, shub: ScrapinghubManager):
# use empty list to get all jobs
iterable = list()
settings = {
shub.defaults.api_key: {
shub.defaults.project_id: {
shub.defaults.spider_id: iterable,
}
}
}
logger = shub.logger
new = cls(settings=settings, logger=logger)
return new
@classmethod
def new_helper(cls):
logger = logging.getLogger('SHubFetcher: ScrapinghubManager helper')
logger.setLevel(logging.ERROR)
shub = ScrapinghubManager(lazy_mode=True, logger=logger)
return shub
@classmethod
def process_settings(cls, settings: SettingsInputType) -> ProcessedSettingsType:
helper = cls.new_helper()
processed: List[Tuple[Client, ProjectsTuple]] = list()
for api_key, projects in settings.items():
if not isinstance(api_key, str):
raise TypeError(
f'API key must a string, got {type(api_key)} instead.')
helper.switch_client(api_key)
processed_projects: List[Tuple[Project, SpidersTuple]] = list()
for project_id, spiders in projects.items():
if not isinstance(project_id, int):
raise TypeError(
f'project ID must an integer, '
f'got {type(project_id)} instead.')
helper.switch_project(project_id)
processed_spiders: List[Tuple[Spider, Iterator[int]]] = list()
for spider_name_or_id, exclude_iterable in spiders.items():
if isinstance(spider_name_or_id, str):
spider_name = spider_name_or_id
elif isinstance(spider_name_or_id, int):
spider_name = spider_id_to_name(
spider_name_or_id, helper.project)
else:
raise TypeError(
f'Spider name or ID must a string or an integer, '
f'got {type(spider_name_or_id)} instead.')
# process spider name or ID
helper.switch_spider(spider_name)
# process exclude
exclude_list = [int(i) for i in exclude_iterable] # type-check
exclude_list.sort(reverse=True) # sort, to get bigger numbers first
exclude_iterator = iter(exclude_list)
processed_spiders.append((helper.spider, exclude_iterator, ))
processed_spiders: SpidersTuple = tuple(processed_spiders)
processed_projects.append((helper.project, processed_spiders, ))
processed_projects: ProjectsTuple = tuple(processed_projects)
processed.append((helper.client, processed_projects, ))
processed: ClientsTuple = tuple(processed)
return processed
iter_job_summaries = staticmethod(partial(
JobSummary.iter_from_spider,
params={
META_STATE: META_STATE_FINISHED,
META : [META_KEY, META_CLOSE_REASON, META_ITEMS],
}
))
def latest_spiders_jobkeys(self, spider: Spider,
exclude_iterator: JobNumIter) -> JobKeyIter:
"""
Fetches latest jobs of the given spider, and yields their keys.
:param spider: `Spider` instance
:param exclude_iterator: object that yields job's numbers, that you do
not want to get from this method
:return: iterator that yields job's numbers
"""
def context_processor(value: JobSummary, context_type: type) -> BaseContext:
ctx = context_type(value=value, exclude_value=value.jobkey.job_num)
return ctx
def before_finish(ctx: BaseContext):
self.logger.info(
f'Finished on {ctx.value.jobkey.job_num} job number '
f'with close reason: "{ctx.close_reason}".')
def return_jobkey(ctx: BaseContext) -> JobKey:
job_summary: JobSummary = ctx.value
return job_summary.jobkey
def unsuccessful_job(ctx: BaseContext) -> bool:
if not ctx.value.was_successful:
self.logger.error(
f'job with {ctx.value.jobkey} key finished unsuccessfully.')
return True
else:
return False
def empty_job(ctx: BaseContext) -> bool:
if ctx.value.items < 1:
self.logger.info(
f'job with {ctx.value.jobkey} key has no items.')
return True
else:
return False
iter_manager = IterManager(
general_iterator=self.iter_job_summaries(spider),
value_type=JobSummary,
return_value_processor=return_jobkey,
return_type=JobKey,
exclude_iterator=exclude_iterator,
exclude_value_type=int,
exclude_default=0,
max_iterations=self.maximum_fetched_jobs,
max_exclude_strike=self.maximum_excluded_matches,
max_returned_values=self.maximum_returned_jobs,
max_total_excluded=self.maximum_total_excluded,
before_finish=before_finish,
context_processor=context_processor,
case_processors=(unsuccessful_job, empty_job),
)
self.logger.info(f'Ready to fetch jobs for {spider.key} spider.')
yield from iter_manager
def latest_spiders_jobs(self, spider: Spider,
exclude_iterator: JobNumIter) -> JobIter:
for jobkey in self.latest_spiders_jobkeys(spider, exclude_iterator):
yield spider.jobs.get(job_key=str(jobkey))
def iter_spider_exclude_tuple(self) -> Tuple[Spider, JobNumIter]:
for client, projects in self.settings:
for project, spiders in projects:
yield from spiders
def fetch_jobs(self) -> JobIter:
for spider, exclude in self.iter_spider_exclude_tuple():
yield from self.latest_spiders_jobs(spider, exclude)
def fetch_jobkeys(self) -> JobKeyIter:
for spider, exclude in self.iter_spider_exclude_tuple():
yield from self.latest_spiders_jobkeys(spider, exclude)
def fetch_items(self) -> ItemIter:
for job in self.fetch_jobs():
yield from job.items.iter()
def fetch_logs(self) -> LogIter:
for job in self.fetch_jobs():
yield from job.logs.iter()
def fetch(self, *, jobkey=False, job=False, items=False, logs=False) -> Iterator[dict]:
if not any([job, jobkey, items, logs]):
raise ValueError
for job_obj in self.fetch_jobs():
job_obj: Job
result = dict()
if jobkey:
result['jobkey'] = job_obj.key
if job:
result['job'] = job_obj
if items:
result['items'] = job_obj.items
if logs:
result['logs'] = job_obj.logs
yield result
from typing import Iterator, Tuple
from scrapinghub.client.projects import Project
from scrapinghub.client.spiders import Spider
from scrapinghub.client.exceptions import NotFound
from .constants import (
META_STATE, META_STATE_FINISHED, META, META_KEY, META_CLOSE_REASON, META_ITEMS, JOBKEY_SEPARATOR,
)
__all__ = (
'shortcut_api_key',
'spider_name_to_id', 'spider_id_to_name',
'spider_from_id', 'spider_from_name',
)
def shortcut_api_key(api_key: str, margin: int =4) -> str:
"""
Hides most of the API key for security reasons.
:param api_key: string representing API key.
:param margin: number of characters of the given `api_key` string to show on
the start and the end.
:return: shortcut API key
"""
middle = '\u2026'
return f'{api_key[:margin]}{middle}{api_key[-margin:]}'
def spider_name_to_id(spider_name: str, project: Project) -> int:
spider: Spider = project.spiders.get(spider_name)
project_id_str, spider_id_str = spider.key.split(JOBKEY_SEPARATOR)
return int(spider_id_str)
def spider_id_to_name(spider_id: int, project: Project) -> str:
for spider_dict in project.spiders.list():
name = spider_dict['id']
spider: Spider = project.spiders.get(name)
project_id_str, spider_id_str = spider.key.split(JOBKEY_SEPARATOR)
if spider_id == int(spider_id_str):
return name
else:
raise NotFound(f'No such spider with {spider_id} ID found')
def spider_from_name(spider_name: str, project: Project) -> Spider:
return project.spiders.get(spider_name)
def spider_from_id(spider_id: int, project: Project) -> Spider:
return project.spiders.get(spider_id_to_name(spider_id, project))
import re
import typing
from scrapinghub.client.spiders import Spider
from .constants import (
JOBKEY_SEPARATOR, JOBKEY_PATTERN,
META_STATE, META_STATE_FINISHED,
META_CLOSE_REASON, META_CLOSE_REASON_FINISHED,
META_ITEMS, META_KEY, META_SPIDER, META,
)
from ..utils.check import check_obj_type
class JobKey:
AsTupleType = typing.Tuple[int, int, int]
AsDictType = typing.Dict[str, int]
separator = JOBKEY_SEPARATOR
pattern = JOBKEY_PATTERN
keys = ('project_id', 'spider_id', 'job_num')
def __init__(self, *args):
if len(args) == 1 and isinstance(args[0], str):
string = args[0]
project_id, spider_id, job_num = self.parse(string)
elif len(args) == 3 and all(isinstance(arg, int) for arg in args):
project_id, spider_id, job_num = args
string = self.concatenate(project_id, spider_id, job_num)
else:
raise ValueError
self._string = string
self._project_id = project_id
self._spider_id = spider_id
self._job_num = job_num
@classmethod
def concatenate(cls, project_id: int, spider_id: int, job_num: int) -> str:
return cls.separator.join(str(i) for i in (project_id, spider_id, job_num))
@classmethod
def parse(self, string: str) -> AsTupleType:
if re.fullmatch(self.pattern, string):
# we know that there are only 3 elements because of pattern match
elements: self.AsTupleType = tuple(
int(s) for s in string.split(JOBKEY_SEPARATOR))
for i, item, name in zip(range(3), elements, self.keys):
check_obj_type(item, int, f'Item #{i} (for "{name}")')
assert item > 0
return elements
else:
raise ValueError
@classmethod
def from_string(cls, string: str) -> 'JobKey':
return JobKey.from_tuple(cls.parse(string))
@classmethod
def from_tuple(cls, tupl: AsTupleType) -> 'JobKey':
return JobKey(*tupl)
@classmethod
def from_dict(cls, dictionary: AsDictType) -> 'JobKey':
return JobKey(dictionary[k] for k in cls.keys)
def as_tuple(self) -> AsTupleType:
return self._project_id, self._spider_id, self._job_num
def as_dict(self) -> AsDictType:
return {k: v for k, v in zip(self.keys, self.as_tuple())}
def as_string(self) -> str:
return self._string
@property
def project_id(self) -> int:
return self._project_id
@property
def spider_id(self) -> int:
return self._spider_id
@property
def job_num(self) -> int:
return self._job_num
def __iter__(self) -> typing.Iterator[int]:
yield from self.as_tuple()
def __repr__(self):
return f'<JobKey {self.as_string()}>'
def __str__(self):
return self.as_string()
class JobSummary:
def __init__(self, dictionary: typing.Dict[str, typing.Union[str, int]]):
try:
assert META_KEY in dictionary
assert dictionary[META_STATE] == META_STATE_FINISHED # checks if job was finished
assert META_CLOSE_REASON in dictionary
except AssertionError as exc:
raise ValueError from exc
self._dictionary = dictionary
def get(self, key: str, default=None):
return self._dictionary.get(k=key, default=default)
def __getitem__(self, item: str):
return self._dictionary[item]
def __contains__(self, item):
return item in self._dictionary
@property
def jobkey(self) -> JobKey:
return JobKey.from_string(self._dictionary[META_KEY])
@property
def close_reason(self) -> str:
return self._dictionary[META_CLOSE_REASON]
@property
def state(self) -> str:
return self._dictionary[META_STATE]
@property
def items(self) -> int:
return self._dictionary.get(META_ITEMS, 0)
@property
def spider_name(self) -> str:
return self._dictionary[META_SPIDER]
@property
def was_successful(self) -> bool:
return self._dictionary[META_CLOSE_REASON] == META_CLOSE_REASON_FINISHED
@classmethod
def iter_from_spider(cls, spider: Spider, params: dict) \
-> typing.Iterator['JobSummary']:
for job_dict in spider.jobs.iter(**params):
yield cls(job_dict)
import logging
from typing import Dict, Tuple
from functools import partial
from scrapinghub import ScrapinghubClient as Client
from scrapinghub.client.projects import Project
from scrapinghub.client.spiders import Spider
from .funcs import shortcut_api_key, spider_id_to_name
from ..utils.check import check_obj_type, raise_or_none
_logger = logging.getLogger('ScrapingHub interface')
_logger.setLevel(logging.DEBUG)
class ManagerDefaults:
API_KEY = 'api_key'
PROJECT_ID = 'project_id'
SPIDER_ID = 'spider_id'
SPIDER_NAME = 'spider_name'
@classmethod
def key_type_dict(cls) -> Tuple[Dict[str, type], ...]:
return (
{
cls.API_KEY: str,
},
{
cls.PROJECT_ID: int,
},
{
cls.SPIDER_ID: int,
cls.SPIDER_NAME: str,
},
)
@classmethod
def keys_tuple(cls) -> tuple:
keys = []
for d in cls.key_type_dict():
keys += list(d.keys())
return tuple(keys)
def __init__(self, api_key: str =None,
project_id: int =None,
spider_id: int =None,
spider_name: str =None,
logger: logging.Logger =None):
if logger is None:
logger = _logger
self.logger = logger
input_kwargs = {
self.API_KEY: api_key,
self.PROJECT_ID: project_id,
self.SPIDER_ID: spider_id,
self.SPIDER_NAME: spider_name,
}
self._config = self.check_conf({k: v for k, v in input_kwargs.items() if v is not None})
def __getitem__(self, item: str):
if item in self.keys_tuple():
try:
return self._config[item]
except KeyError:
raise KeyError(
f'Given {item} key not found in defaults.'
) from None
else:
raise KeyError(
f'{item} defaults key is not supported.'
) from None
def check_conf(self, config: dict) -> dict:
processed = dict()
for type_dict in self.key_type_dict():
raise_: dict = None
break_ = True
for key, expected_type in type_dict.items():
try:
value = config[key]
break_ = False
except KeyError:
break_ = True
else:
if not isinstance(value, expected_type):
raise_ = {
'key': key,
'value': value,
'value_type': type(value),
'expected_type': expected_type,
}
break
processed[key] = value
if raise_:
msg = str(
f'Config var with {raise_["key"]} has not valid type.'
f'{raise_["expected_type"]} expected, got {raise_["value_type"]}')
self.logger.error(msg)
raise TypeError(msg)
if break_:
break
return processed
@raise_or_none(KeyError)
def client(self, api_key: bool =True) -> str:
if api_key:
return self[self.API_KEY]
else:
raise ValueError
@raise_or_none(KeyError)
def project(self, id_: bool =True) -> int:
if id_:
return self[self.PROJECT_ID]
else:
raise ValueError
@raise_or_none(KeyError)
def spider(self, *, id_: bool =True, name: bool =False) -> int or str:
if id_ and name:
raise ValueError(
f'Only spider\'s name or ID can be returned.'
)
elif id_:
return self[self.SPIDER_ID]
elif name:
return self[self.SPIDER_NAME]
else:
raise ValueError(
f'`id_` or `name` key-word arguments must be `True`.'
)
api_key = property(partial(client, api_key=True, raise_=False))
project_id = property(partial(project, id_=True, raise_=False))
spider_id = property(partial(spider, id_=True, name=False, raise_=False))
spider_name = property(partial(spider, id_=False, name=True, raise_=False))
class ScrapinghubManager:
shortcut_api_key = staticmethod(shortcut_api_key)
def __init__(self, *, lazy_mode: bool =False,
defaults: ManagerDefaults or None =None,
default_conf: dict or None =None,
initial_conf: dict or None =None,
logger: logging.Logger = None):
"""
:param lazy_mode: if turned on, lets object to have unset entities. They
will be set only when needed.
:param initial_conf: dictionary for `switch` method.
"""
if logger is None:
logger = _logger
self.logger = logger
if defaults is None and default_conf is not None:
defaults = ManagerDefaults(**default_conf, logger=self.logger)
self.defaults = defaults
self._is_lazy = lazy_mode
# reset client, project and spider to `unset` value
self.reset_client(stateless=True)
if initial_conf:
self.switch(**initial_conf)
elif not lazy_mode:
# call below must start chain of `switch_` calls
self.switch_client()
@property
def unset(self):
return None
@property
def is_lazy(self) -> bool:
return self._is_lazy
def set_defaults(self, defaults: ManagerDefaults):
check_obj_type(defaults, ManagerDefaults, 'Manager defaults')
self.defaults = defaults
def __repr__(self):
return f''
"""
Entity properties, that returns instances of `scrapinghub` library's
`Spider`, `Project`, `Client` classes.
"""
@property
def spider(self) -> Spider:
spider = self._spider
if spider is not self.unset:
return spider
elif not self._is_lazy:
return self.switch_spider()
else:
raise ValueError('`spider` is not set yet.')
@property
def project(self) -> Project:
project = self._project
if project is not self.unset:
return project
elif not self._is_lazy:
return self.switch_project()
else:
raise ValueError('`project` is not set yet.')
@property
def client(self) -> Client:
client = self._client
if client is not self.unset:
return client
elif not self._is_lazy:
return self.switch_client()
else:
raise ValueError('`client` is not set yet.')
"""
`_switch_*` methods calls `get_*` method, assigns value and logs it.
"""
def _switch_spider(self, spider_name: str) -> Spider:
spider = self.get_spider(spider_name)
self._spider = spider
self.logger.info(
f'Spider switched to "{spider_name}" ({spider.key}).')
return spider
def _switch_project(self, project_id: int) -> Project:
project = self.get_project(project_id)
self._project = project
self.logger.info(
f'Project switched to #{project_id}.')
return project
def _switch_client(self, api_key: str) -> Client:
client = self.get_client(api_key)
self._client = client
self.logger.info(
f'Client switched by {self.shortcut_api_key(api_key)} API key.')
return client
"""
`switch_*` methods checks if requirement is unset and if so - raises
ValueError else - checks if given argument is `None` and if soc - uses
default key, but in each case they calls `_switch_*` method with that key
"""
def switch_spider(self, spider_name: str or None =None) -> Spider:
if self.project is self.unset:
raise ValueError(f'Can not change `spider` while '
f'`project` is not set (=`{self.unset}`)')
if spider_name is None:
spider_name = self.defaults.spider_name
if spider_name is None:
spider_id = self.defaults.spider_id
if spider_id is None:
msg = str(
f'Trying to switch to default spider, '
f'but no spider-related data found in defaults.'
)
self.logger.error(msg)
raise RuntimeError(msg)
spider_name = spider_id_to_name(spider_id, self.project)
spider = self._switch_spider(spider_name)
return spider
def switch_project(self, project_id: int or None =None) -> Project:
if self.client is self.unset:
raise ValueError(f'Can not change `project` while '
f'`client` is not set (=`{self.unset}`)')
if project_id is None:
project_id = self.defaults.project_id
project = self._switch_project(project_id)
self.reset_spider()
return project
def switch_client(self, api_key: str or None =None) -> Client:
if api_key is None:
api_key = self.defaults.api_key
client = self._switch_client(api_key)
self.reset_project()
return client
def switch(self, **kwargs):
if 'api_key' in kwargs:
self.switch_client(kwargs['api_key'])
if 'project_id' in kwargs:
self.switch_project(kwargs['project_id'])
if 'spider_name' in kwargs:
self.switch_spider(kwargs['spider_name'])
"""
`reset_*` methods checks `stateless` mode and if so - calls `drop_*` method
else - calls `switch_` methods with `None` as only argument, which means
to switch to default value.
"""
def reset_spider(self, stateless: bool =False):
if self._is_lazy or stateless:
self.drop_spider()
else:
self.switch_spider(None)
def reset_project(self, stateless: bool =False):
if self._is_lazy or stateless:
self.drop_project()
else:
self.switch_project(None)
def reset_client(self, stateless: bool =False):
if self._is_lazy or stateless:
self.drop_client()
else:
self.switch_client(None)
"""
`_drop_*` methods sets entity to `_unset_value` and logs it.
"""
def _drop_spider(self):
self._spider = self.unset
self.logger.info(f'Spider dropped.')
def _drop_project(self):
self._project = self.unset
self.logger.info(f'Project dropped.')
def _drop_client(self):
self._client = self.unset
self.logger.info(f'Client dropped.')
"""
`drop_*` methods must call `_drop_*` method and reset entities
that depends on it.
"""
def drop_spider(self):
self._drop_spider()
def drop_project(self):
self._drop_project()
self.reset_spider(stateless=True)
def drop_client(self):
self._drop_client()
self.reset_project(stateless=True)
"""
`get_*` methods must take an identifier of the entity, get it, and return.
Nothing else, but they are normal methods.
"""
def get_spider(self, spider_name: str) -> Spider:
return self.project.spiders.get(str(spider_name))
def get_project(self, project_id: int) -> Project:
return self.client.get_project(int(project_id))
def get_client(self, api_key: str) -> Client:
return Client(str(api_key))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment