Skip to content

Instantly share code, notes, and snippets.

@freeatnet
Last active December 10, 2015 21:38
Show Gist options
  • Save freeatnet/4496606 to your computer and use it in GitHub Desktop.
Save freeatnet/4496606 to your computer and use it in GitHub Desktop.
Resolver.py -- a simple class that allows you to get to the canonical URL by following redirects, meta refresh tags, and rel="canonical" tags. Written for Python 2.7 using requests library. Beta-quality at the moment. Inspired by https://github.com/scottwater/unwind.
import logging
import re
import urlparse
import requests
class Resolver:
re_canonical = re.compile('<link rel=[\'\"]canonical[\'\"] href=[\'\"](.*?)[\'\"]', re.IGNORECASE)
re_redirect = re.compile('<meta[^>]*?;url=(.*?)["\']', re.IGNORECASE)
def __init__(self, url):
self.history = []
self.original_url = url
self.response = None
self.final_url = url
def resolve(self):
logging.debug("Resolving %s" % (self.original_url, ))
self.__resolve(self.original_url)
def __resolve(self, from_url, request_verb='head', retry_count=0):
retry_count += 1
r = None
logging.debug("Making a %s request to %s" % (request_verb, from_url))
try:
request_method = getattr(requests, request_verb)
# Why a Chrome User-Agent? Because people will apparently block other agents if they look like scripts
r = request_method(from_url, allow_redirects=True, verify=False, headers={"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_8_2) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.101 Safari/537.11"}, timeout=5.0)
self.response = r
except requests.exceptions.RequestException, e:
logging.error("Resolving %s caused exception" % (from_url))
logging.error(e)
logging.error("Retrying (%i-th time)..." % (retry_count))
# TODO: Needs a backoff?
if retry_count < 100:
self.__resolve(from_url, retry_count=retry_count)
else:
raise
if 300 <= r.status_code < 400:
self.__resolve(r.headers['location'], request_verb='head')
elif 400 <= r.status_code < 405:
except_msg = "Resolving %s has ended with code %i" % (r.url, r.status_code)
logging.error(except_msg)
raise requests.exceptions.HTTPError(except_msg)
elif r.status_code is 405 and request_verb is 'head':
logging.error("Resolving %s requires request verb GET (405 yielded)" \
% (r.url))
self.__resolve(r.url, request_verb='get')
elif r.status_code is 405 and request_verb is 'get':
except_msg = "Requesting %s with GET yielded 405 status code" % (r.url)
raise requests.exceptions.HTTPError(except_msg)
elif 500 <= r.status_code < 600:
try:
r.raise_for_status()
except requests.exceptions.HTTPError, e:
logging.warning("Resolving %s caused " % (r.url))
logging.warning(e)
logging.warning("Retrying (%i-th time)..." % (retry_count))
# TODO: Needs a back-off strategy.
if retry_count < 100:
self.__resolve(r.url, retry_count=retry_count)
else:
r.raise_for_status()
elif r.status_code is 200 and request_verb is 'head':
if self.__can_discover_url_from_response(r):
self.__resolve(r.url, request_verb='get')
else:
self.final_url = r.url
elif r.status_code is 200:
self.final_url = self.__final_url_from_response(r)
return self.final_url
def __final_url_from_response(self, r):
logging.debug("Can discover from %s? %i" % (r.url, self.__can_discover_url_from_response(r)))
if not self.__can_discover_url_from_response(r):
return r.url
if len(r.text) == 0:
return r.url
redirect_match = self.__find_meta_redirect(r.text)
logging.debug("Meta refresh redirect: %s" % (redirect_match, ))
# TODO: Handle infinite meta refresh redirect
if redirect_match:
redirect_url = urlparse.urljoin(r.url, redirect_match)
return self.__resolve(redirect_url, request_verb='get')
canonical_match = self.__find_canonical_url(r.text)
logging.debug("Meta canonical match: %s" % (redirect_match, ))
if canonical_match:
return urlparse.urljoin(r.url, canonical_match)
logging.debug("Discovery did not change URL for %s" % (r.url, ))
return r.url
def __find_meta_redirect(self, response_text):
match = self.re_redirect.search(response_text)
if match:
return match.groups()[0].strip()
return None
def __find_canonical_url(self, response_text):
match = self.re_canonical.search(response_text)
if match:
return match.groups()[0].strip()
return None
def __can_discover_url_from_response(self, r):
return r.headers['content-type'] is not None \
and r.headers['content-type'].startswith('text')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment