Created
June 14, 2014 17:49
-
-
Save lewiseason/6fa5bf1eb063a97e2ca6 to your computer and use it in GitHub Desktop.
Check status of some web services and send an email if they're down
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# check - Check status of some web services | |
# and send an email if they're down. | |
# List of checks to perform. Tuples in the form | |
# (url, expected_http_response) | |
sender = 'noreply@example.com' | |
recipient = 'root@example.com' | |
subject = 'Checks Failed' | |
mailserver = 'localhost' | |
DEBUG = False | |
CACHE = '/tmp/check-py-services' | |
checks = [ | |
('http://example.com', 200), | |
] | |
import sys | |
import threading | |
import urllib2 | |
import smtplib | |
from email.mime.text import MIMEText | |
threads = [] | |
results = [] | |
def log(message): | |
if DEBUG: | |
print message | |
def check_status(url, expect): | |
global results | |
log("[%s] Checking" % (url)) | |
try: | |
conn = urllib2.urlopen(url, timeout=5) | |
conn.close() | |
result = conn.getcode() | |
except urllib2.URLError: | |
# Typically a timeout | |
result = 'timeout' | |
log("[%s] got %s" % (url, result)) | |
results.append({ | |
'url': url, | |
'code': result, | |
'expect': expect, | |
'result': (result == expect), | |
}) | |
# Perform the checks in parallel | |
for check in checks: | |
thread = threading.Thread(target=check_status, args=check) | |
thread.start() | |
threads.append(thread) | |
# Wait for them all to finish | |
for thread in threads: | |
thread.join() | |
log("Finished") | |
# If any results are False | |
failures = filter(lambda result: result['result'] == False, results) | |
# None found, nothing to do here | |
if not failures: | |
sys.exit(0) | |
log("Oops, something appears to be down. Notifying %s" % recipient) | |
downed_service_message = "\n".join([ " * %s returned %s" % (result['url'], result['code']) for result in failures]) | |
# Check if this is the same as the list we had last time | |
# If it is, exit gracefully | |
# Just in case | |
open(CACHE, 'a').close() | |
with open(CACHE, 'r') as file: | |
cache = file.read() | |
if cache == downed_service_message: | |
sys.exit(0) | |
# If not, update the cached list and send the email | |
with open(CACHE, 'w') as file: | |
file.write(downed_service_message) | |
message = MIMEText("""Hello, | |
While performing automated checks, the following services were detected | |
to be in incorrect states: | |
%s | |
Cheers, | |
check.py robot | |
""" % downed_service_message) | |
message['Subject'] = subject | |
message['From'] = sender | |
message['To'] = recipient | |
message['X-Mailer'] = 'HTTP Services Checker <me@lewiseason.co.uk>' | |
s = smtplib.SMTP(mailserver) | |
s.sendmail(sender, [recipient], message.as_string()) | |
s.quit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment