Created
November 14, 2023 21:42
-
-
Save curita/a84e83b1786cd208c0deaeb526e15940 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
There are cases where jobs can fail abruptly in such a way that Spidermon | |
(or any other extensions that run at the end of Scrapy) won't run. | |
In these situations, we won't be alerted that something happened because | |
Spidermon didn't run at the end, so it won't generate alerts and ScrapyCloud | |
also won't warn about them. | |
This script has the objective of helping identifying those jobs. | |
In order to use it (either locally or in scrapy cloud), put the following script | |
in your project: | |
.. code-block:: python | |
from spidermon.scripts.check_failed_jobs import CheckFailedJobs | |
with CheckFailedJobs(notifiers=[...]) as checker: | |
checker.run() | |
`notifiers` is a list of functions that receive a string message and do something | |
with it. i.e. a slack message sender, a different logger, etc. | |
Then you can call your new file and pass in the arguments: | |
* --api-key (Scrapy Cloud API key) | |
* --project-id | |
* --lookback-hours - How many hours to look back (only used if --no-lookback-from-last) | |
* --include-reported - Include jobs that were already reported by Spidermon | |
* --no-notify - Disables notifications | |
* --no-lookback-from-last - Disables using the last script run time as the start time | |
for the lookback | |
When running in SC, the script will try to fetch the API key (SH_APIKEY) from the project | |
settings and the project id from the current running job. In that case, you don't need to | |
provide these arguments. | |
Also, by default, the script will look for the last time it ran and only check jobs since | |
then. | |
The script should report as errors any jobs that have the 'failed' as a close reason. | |
If you want to run it in scrapy cloud, don't forget to include the script in your | |
`setup.py` file, so it gets picked up and deployed. | |
""" | |
import logging | |
import json | |
from argparse import ArgumentParser | |
from datetime import datetime, timedelta | |
import os | |
from scrapinghub import Project, Connection | |
logger = logging.getLogger() | |
def kumo_settings(): | |
settings = {} | |
shub_job_data = json.loads(os.environ.get("SHUB_SETTINGS", "{}")) | |
if shub_job_data: | |
settings.update(shub_job_data["project_settings"]) | |
settings.update(shub_job_data["spider_settings"]) | |
else: | |
logger.info("Couldn't find Dash project settings, probably not running in Dash") | |
return settings | |
class CheckFailedJobs: | |
def __init__(self, notifiers=[]): | |
self.API_KEY = ( | |
kumo_settings().get("SH_APIKEY") | |
or os.environ.get("SHUB_JOBAUTH") | |
or os.environ.get("SHUB_APIKEY") | |
) | |
self.PROJECT_ID = os.environ.get("SHUB_JOBKEY", "///").split("/")[0] | |
self.notifiers = notifiers | |
self.args = self.parse_args() | |
self.failed_jobs = [] | |
def __enter__(self): | |
return self | |
def __exit__(self, typ, val, traceback): | |
if len(self.failed_jobs) > 0: | |
for job in self.failed_jobs: | |
logger.error(f"Job {job.info.get('id')} has close reason 'failed'.") | |
if not self.args.no_notify: | |
for notifier in self.notifiers: | |
notifier(f"Job {job.info.get('id')} has close reason 'failed'.") | |
logger.info("Finished checking job failures.") | |
return | |
def parse_args(self): | |
parser = ArgumentParser() | |
parser.add_argument("--api-key", default=self.API_KEY) | |
parser.add_argument("--project-id", default=self.PROJECT_ID) | |
parser.add_argument( | |
"--lookback-hours", | |
help="How far back (in hours) the script should look for failed jobs.", | |
type=int, | |
default=24, | |
) | |
parser.add_argument( | |
"--include-reported", | |
help="Include jobs that were already reported by Spidermon", | |
action="store_true", | |
) | |
parser.add_argument( | |
"--no-notify", | |
action="store_true", | |
help="Disables calling the functions passed in the __init__ method.", | |
) | |
parser.add_argument( | |
"--no-lookback-from-last", | |
action="store_true", | |
help=( | |
"Disables looking for jobs that finished " | |
"since this script last ran and uses --lookback_hours instead." | |
), | |
) | |
args = parser.parse_args() | |
if not args.api_key: | |
parser.error( | |
"Please provide an API key with the --api-key option or set SH_APIKEY in your project settings." | |
) | |
if not args.project_id: | |
parser.error("Please provide a project id with the --project-id option.") | |
return args | |
def get_failed_jobs(self): | |
project = Project(Connection(self.args.api_key), self.args.project_id) | |
script_name = os.path.basename(__file__) | |
last_run = sorted( | |
[ | |
datetime.fromisoformat(tags[0].split("end_limit:")[-1]) | |
for job in project.jobs(spider=f"py:{script_name}", state="finished") | |
if ( | |
tags := [ | |
tag for tag in job.info["tags"] if tag.startswith("end_limit") | |
] | |
) | |
], | |
reverse=True, | |
) | |
if last_run and not self.args.no_lookback_from_last: | |
logger.info("Looking back from the last time this script ran.") | |
since_time = last_run[0] | |
elif self.args.lookback_hours: | |
logger.info("Looking back using a fixed number of hours.") | |
since_time = datetime.utcnow() - timedelta(hours=self.args.lookback_hours) | |
else: | |
logger.warning( | |
"No lookback set and last run not found: using default 24 hours lookback." | |
) | |
since_time = datetime.utcnow() - timedelta(hours=24) | |
end_limit = datetime.utcnow() | |
jobs = [ | |
job | |
for job in project.jobs(state="finished") | |
if datetime.strptime(job.info["updated_time"], "%Y-%m-%dT%H:%M:%S") | |
>= since_time | |
and datetime.strptime(job.info["updated_time"], "%Y-%m-%dT%H:%M:%S") | |
<= end_limit | |
and job.info.get("close_reason") == "failed" | |
and ( | |
True | |
if self.args.include_reported | |
else job.info.get("finish_reason") is None | |
) | |
] | |
job = project.job(os.getenv("SHUB_JOBKEY")) | |
job.update(add_tag=f"end_limit:{end_limit.isoformat()}") | |
return jobs | |
def run(self): | |
self.failed_jobs = self.get_failed_jobs() | |
if __name__ == "__main__": | |
with CheckFailedJobs() as job: | |
job.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment