Skip to content

Instantly share code, notes, and snippets.

@curita
Created November 14, 2023 21:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save curita/a84e83b1786cd208c0deaeb526e15940 to your computer and use it in GitHub Desktop.
Save curita/a84e83b1786cd208c0deaeb526e15940 to your computer and use it in GitHub Desktop.
"""
There are cases where jobs can fail abruptly in such a way that Spidermon
(or any other extensions that run at the end of Scrapy) won't run.
In these situations, we won't be alerted that something happened because
Spidermon didn't run at the end, so it won't generate alerts and ScrapyCloud
also won't warn about them.
This script has the objective of helping identifying those jobs.
In order to use it (either locally or in scrapy cloud), put the following script
in your project:
.. code-block:: python
from spidermon.scripts.check_failed_jobs import CheckFailedJobs
with CheckFailedJobs(notifiers=[...]) as checker:
checker.run()
`notifiers` is a list of functions that receive a string message and do something
with it. i.e. a slack message sender, a different logger, etc.
Then you can call your new file and pass in the arguments:
* --api-key (Scrapy Cloud API key)
* --project-id
* --lookback-hours - How many hours to look back (only used if --no-lookback-from-last)
* --include-reported - Include jobs that were already reported by Spidermon
* --no-notify - Disables notifications
* --no-lookback-from-last - Disables using the last script run time as the start time
for the lookback
When running in SC, the script will try to fetch the API key (SH_APIKEY) from the project
settings and the project id from the current running job. In that case, you don't need to
provide these arguments.
Also, by default, the script will look for the last time it ran and only check jobs since
then.
The script should report as errors any jobs that have the 'failed' as a close reason.
If you want to run it in scrapy cloud, don't forget to include the script in your
`setup.py` file, so it gets picked up and deployed.
"""
import logging
import json
from argparse import ArgumentParser
from datetime import datetime, timedelta
import os
from scrapinghub import Project, Connection
logger = logging.getLogger()
def kumo_settings():
settings = {}
shub_job_data = json.loads(os.environ.get("SHUB_SETTINGS", "{}"))
if shub_job_data:
settings.update(shub_job_data["project_settings"])
settings.update(shub_job_data["spider_settings"])
else:
logger.info("Couldn't find Dash project settings, probably not running in Dash")
return settings
class CheckFailedJobs:
def __init__(self, notifiers=[]):
self.API_KEY = (
kumo_settings().get("SH_APIKEY")
or os.environ.get("SHUB_JOBAUTH")
or os.environ.get("SHUB_APIKEY")
)
self.PROJECT_ID = os.environ.get("SHUB_JOBKEY", "///").split("/")[0]
self.notifiers = notifiers
self.args = self.parse_args()
self.failed_jobs = []
def __enter__(self):
return self
def __exit__(self, typ, val, traceback):
if len(self.failed_jobs) > 0:
for job in self.failed_jobs:
logger.error(f"Job {job.info.get('id')} has close reason 'failed'.")
if not self.args.no_notify:
for notifier in self.notifiers:
notifier(f"Job {job.info.get('id')} has close reason 'failed'.")
logger.info("Finished checking job failures.")
return
def parse_args(self):
parser = ArgumentParser()
parser.add_argument("--api-key", default=self.API_KEY)
parser.add_argument("--project-id", default=self.PROJECT_ID)
parser.add_argument(
"--lookback-hours",
help="How far back (in hours) the script should look for failed jobs.",
type=int,
default=24,
)
parser.add_argument(
"--include-reported",
help="Include jobs that were already reported by Spidermon",
action="store_true",
)
parser.add_argument(
"--no-notify",
action="store_true",
help="Disables calling the functions passed in the __init__ method.",
)
parser.add_argument(
"--no-lookback-from-last",
action="store_true",
help=(
"Disables looking for jobs that finished "
"since this script last ran and uses --lookback_hours instead."
),
)
args = parser.parse_args()
if not args.api_key:
parser.error(
"Please provide an API key with the --api-key option or set SH_APIKEY in your project settings."
)
if not args.project_id:
parser.error("Please provide a project id with the --project-id option.")
return args
def get_failed_jobs(self):
project = Project(Connection(self.args.api_key), self.args.project_id)
script_name = os.path.basename(__file__)
last_run = sorted(
[
datetime.fromisoformat(tags[0].split("end_limit:")[-1])
for job in project.jobs(spider=f"py:{script_name}", state="finished")
if (
tags := [
tag for tag in job.info["tags"] if tag.startswith("end_limit")
]
)
],
reverse=True,
)
if last_run and not self.args.no_lookback_from_last:
logger.info("Looking back from the last time this script ran.")
since_time = last_run[0]
elif self.args.lookback_hours:
logger.info("Looking back using a fixed number of hours.")
since_time = datetime.utcnow() - timedelta(hours=self.args.lookback_hours)
else:
logger.warning(
"No lookback set and last run not found: using default 24 hours lookback."
)
since_time = datetime.utcnow() - timedelta(hours=24)
end_limit = datetime.utcnow()
jobs = [
job
for job in project.jobs(state="finished")
if datetime.strptime(job.info["updated_time"], "%Y-%m-%dT%H:%M:%S")
>= since_time
and datetime.strptime(job.info["updated_time"], "%Y-%m-%dT%H:%M:%S")
<= end_limit
and job.info.get("close_reason") == "failed"
and (
True
if self.args.include_reported
else job.info.get("finish_reason") is None
)
]
job = project.job(os.getenv("SHUB_JOBKEY"))
job.update(add_tag=f"end_limit:{end_limit.isoformat()}")
return jobs
def run(self):
self.failed_jobs = self.get_failed_jobs()
if __name__ == "__main__":
with CheckFailedJobs() as job:
job.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment