Skip to content

Instantly share code, notes, and snippets.

@gadelkareem
Forked from hn-support/cache-warmer.py
Last active June 18, 2021 00:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gadelkareem/01c3d0cddf1878c395e8d32fc50f4c0e to your computer and use it in GitHub Desktop.
Save gadelkareem/01c3d0cddf1878c395e8d32fc50f4c0e to your computer and use it in GitHub Desktop.
A threaded cache warmer in python
#!/usr/bin/env python3
"""
Warm the caches of your website by crawling each page or sitemap index defined in sitemap.xml.
To use, download this file and make it executable. Then run:
./cache-warmer.py --threads 4 --interval 10 --file https://example.com/sitemap.xml -v
./cache-warmer.py --threads 4 --interval 10 --file /data/web/public/sitemap.xml -v
"""
import argparse
from multiprocessing.pool import ThreadPool
import os.path
import re
import sys
import time
import requests
import subprocess
results = []
start = time.time()
USERAGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.106 Safari/537.36'
def parse_options():
parser = argparse.ArgumentParser(description="""Cache crawler based on a sitemap.xml file""")
parser.add_argument('-t', '--threads', help='How many threads to use', default=10, required=False, type=int)
parser.add_argument('-i', '--interval', help='How many seconds to wait after each round', default=1, required=False,
type=int)
parser.add_argument('-f', '--file', help='The sitemap xml file', required=True, type=str)
parser.add_argument('-v', '--verbose', help='Be more verbose', action='store_true', default=False)
args = parser.parse_args()
if not args.file.startswith('http') and not os.path.isfile(args.file):
parser.error('Could not find sitemap file %s' % args.file)
return args
def crawl_url(url, verbose=False, interval=1):
if verbose:
print("Crawling {}".format(url))
time.sleep(interval)
a = requests.get(url, headers={"user-agent": USERAGENT})
return {'exit': 0 if a.ok() else 1, 'out': a.text, 'url': url}
def make_results():
errcount = 0
exec_time = format(time.time() - start, '.4f')
for item in results:
if item['exit'] == 0:
continue
else:
errcount += 1
print("Errors detected in %s:\n%s\n" % (item['url'], item['out']))
print("=" * 50)
if errcount == 0:
print("All DONE! - All urls are warmed! - done in %s " % exec_time)
return 0
else:
print("%d Errors detected! - done in %ss" % (errcount, exec_time))
return 1
def get_sitemap_urls(p):
if p.startswith('http'):
r = requests.get(p, headers={'User-Agent': USERAGENT})
c = str(r.content)
else:
with open(p) as fh:
c = fh.read()
urls = []
if 'sitemapindex' in c:
sitemaps = re.findall('<loc>(.*?)</loc>?', c)
for s in sitemaps:
urls.extend(get_sitemap_urls(s))
return urls
return re.findall('<loc>(.*?)</loc>?', c)
def callback(output):
results.append(output)
def main():
args = parse_options()
sitemap_urls = get_sitemap_urls(args.file)
if args.verbose:
print("Crawling {} urls with {} threads\n[Please Wait!]".format(len(sitemap_urls), args.threads))
print("=" * 50)
pool = ThreadPool(args.threads)
for url in sitemap_urls:
pool.apply_async(crawl_url, args=(url, args.verbose, args.interval), callback=callback)
pool.close()
pool.join()
sys.exit(make_results())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment