Last active
September 29, 2015 15:30
-
-
Save fuglede/69bc96e50f30d4537d05 to your computer and use it in GitHub Desktop.
Python API for playing with results from starttls.info
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Ad hoc API for starttls.info. | |
This module allows for the fetching of mail server protocol tests from the | |
website https://starttls.info. | |
Example: | |
Get the result of a host which has already been checked but don't bother | |
doing the check if the host hasn't already been checked:: | |
>>> import starttlsinfo | |
>>> host = 'hackenturet.dk' | |
>>> stls = starttlsinfo.Starttlsinfo(host) | |
>>> result = stls.getResult(forceResult=False) | |
>>> result['actual_targets'][0]['score'] | |
u'94.8' | |
It can also force the test results to be updated and can be set up to | |
return the test results only after a test has been completed once:: | |
>>> import starttlsinfo | |
>>> host = 'brk.dk' # Some host which has never been checked. | |
>>> stls = starttlsinfo.Starttlsinfo(host) | |
>>> result = stls.getResult() | |
>>> result['actual_targets'][0]['score'] | |
u'32.1' | |
Or simply:: | |
>>> import starttlsinfo | |
>>> host = 'brk.dk' # Some host which has never been checked. | |
>>> stls = starttlsinfo.Starttlsinfo(host) | |
>>> stls.getScore() | |
32.1 | |
""" | |
import json | |
import math | |
import re | |
import time | |
import urllib | |
import urllib2 | |
class Starttlsinfo: | |
def _is_valid_hostname(self, hostname): | |
"""Return True if and only if the hostname is a valid hostname. | |
Note: | |
Taken from https://stackoverflow.com/a/2532344 | |
Args: | |
hostname (string): The hostname. | |
Returns: | |
bool: True if the hostname is valid. False otherwise | |
""" | |
if len(hostname) > 255: | |
return False | |
if hostname[-1] == ".": | |
# strip exactly one dot from the right, if present | |
hostname = hostname[:-1] | |
allowed = re.compile("(?!-)[A-Z\d-]{1,63}(?<!-)$", re.IGNORECASE) | |
return all(allowed.match(x) for x in hostname.split(".")) | |
def __init__(self, host): | |
if not self._is_valid_hostname(host): | |
raise ValueError("not a valid hostname") | |
self.host = host | |
self.url = 'https://starttls.info/api/check/' + self.host | |
self.timeout = 60 | |
self.retry = 10 | |
self._result = None | |
# Set how long we're willing to wait for results | |
def setTimeout(self, timeout): | |
"""Set a timeout for how long we are willing to retry testing. | |
Args: | |
timeout (int): Maximum timeout in seconds. | |
Raises: | |
ValueError: If `timeout` is not a positive integer. | |
""" | |
if isinstance(timeout, int) and timeout > 0: | |
self.timeout = timeout | |
else: | |
raise ValueError("not a positive integer") | |
def setRetry(self, retry): | |
"""Set the duration between consecutive result fetching attempts. | |
Args: | |
retry (int): Time in seconds. | |
Raises: | |
ValueError: If `retry` is not a positive integer. | |
""" | |
if isinstance(retry, int) and retry > 0: | |
self.retry = retry | |
else: | |
raise ValueError("not a positive integer") | |
def getResult(self, forceResult=True, forceUpdate=False, acceptInProgress=True, cached=True): | |
"""Get result of testing the given hostname. | |
Args: | |
forceResult (bool): Force unchecked hosts to be checked. | |
forceUpdate (bool): Get updated results no matter what. | |
acceptInProgress (bool): If `False`, then treat results from hosts | |
that are currently being processed as invalid. | |
cached (bool): Take cached result if `True`. Can not be used with | |
forceUpdate. | |
Raises: | |
RuntimeError: If no result is available within the given timeout | |
tolerance. | |
Returns: | |
The test result as a dictionary if successful. `None` otherwise. | |
""" | |
if self._result is not None and cached: | |
if (acceptInProgress or self._result['status'] != 'IN PROGRESS') and not forceUpdate: | |
return self._result | |
try: | |
if forceUpdate: | |
self.submitHost() | |
return self._waitForResult() | |
response = urllib2.urlopen(self.url) | |
except urllib2.HTTPError: # Catches non-checked hosts | |
if forceResult: | |
self.submitHost() | |
return self._waitForResult() | |
return None | |
self._result = json.loads(response.read()) | |
if self._result['status'] != "DONE" and not acceptInProgress: | |
return None | |
return self._result | |
def submitHost(self): | |
"""Force host to be checked again.""" | |
# We achieve this by submitted the data "reset=true". For | |
# unchecked hosts, submitted the empty string also works, | |
# but we kill two birds with one stone instead, always | |
# forcing a reset. | |
postdata = urllib.urlencode({'reset': 'true'}) | |
urllib2.urlopen(self.url, postdata) | |
def _waitForResult(self): | |
"""Attempt to get results several times.""" | |
retries = math.ceil(float(self.timeout / self.retry)) | |
for i in range(int(retries)): | |
time.sleep(self.retry) | |
self._result = self.getResult(forceResult=False, acceptInProgress=False) | |
if self._result is not None: | |
return self._result | |
raise RuntimeError("no result available before timeout") | |
def getTime(self): | |
"""Return the time at which the check was carried out""" | |
result = self.getResult() | |
try: | |
return result['status_changed'] | |
except: | |
return None | |
def getScore(self): | |
"""Return the starttls.info score as a list of floats.""" | |
result = self.getResult() | |
try: | |
return [float(target['score']) for target in result['actual_targets']] | |
except: | |
return None | |
def getMX(self): | |
"""Return the mail servers that were checked.""" | |
result = self.getResult() | |
try: | |
return [target['name'] for target in result['actual_targets']] | |
except: | |
return None | |
def getScoreWithMX(self): | |
"""Return a dictionary of mail servers and their scores.""" | |
try: | |
scores = self.getScore() | |
servers = self.getMX() | |
return {servers[i]: scores[i] for i in range(len(scores))} | |
except: | |
return None | |
def getDetails(self): | |
"""Return a list of (XML formatted) details about mail server STARTTLS support.""" | |
try: | |
servers = self.getMX() | |
result = self.getResult() | |
details = [target['description'] for target in result['actual_targets']] | |
return {servers[i]: details[i] for i in range(len(servers))} | |
except: | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment