Skip to content

Instantly share code, notes, and snippets.

@lewiseason
Created June 14, 2014 17:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lewiseason/6fa5bf1eb063a97e2ca6 to your computer and use it in GitHub Desktop.
Save lewiseason/6fa5bf1eb063a97e2ca6 to your computer and use it in GitHub Desktop.
Check status of some web services and send an email if they're down
#!/usr/bin/env python
# check - Check status of some web services
# and send an email if they're down.
# List of checks to perform. Tuples in the form
# (url, expected_http_response)
sender = 'noreply@example.com'
recipient = 'root@example.com'
subject = 'Checks Failed'
mailserver = 'localhost'
DEBUG = False
CACHE = '/tmp/check-py-services'
checks = [
('http://example.com', 200),
]
import sys
import threading
import urllib2
import smtplib
from email.mime.text import MIMEText
threads = []
results = []
def log(message):
if DEBUG:
print message
def check_status(url, expect):
global results
log("[%s] Checking" % (url))
try:
conn = urllib2.urlopen(url, timeout=5)
conn.close()
result = conn.getcode()
except urllib2.URLError:
# Typically a timeout
result = 'timeout'
log("[%s] got %s" % (url, result))
results.append({
'url': url,
'code': result,
'expect': expect,
'result': (result == expect),
})
# Perform the checks in parallel
for check in checks:
thread = threading.Thread(target=check_status, args=check)
thread.start()
threads.append(thread)
# Wait for them all to finish
for thread in threads:
thread.join()
log("Finished")
# If any results are False
failures = filter(lambda result: result['result'] == False, results)
# None found, nothing to do here
if not failures:
sys.exit(0)
log("Oops, something appears to be down. Notifying %s" % recipient)
downed_service_message = "\n".join([ " * %s returned %s" % (result['url'], result['code']) for result in failures])
# Check if this is the same as the list we had last time
# If it is, exit gracefully
# Just in case
open(CACHE, 'a').close()
with open(CACHE, 'r') as file:
cache = file.read()
if cache == downed_service_message:
sys.exit(0)
# If not, update the cached list and send the email
with open(CACHE, 'w') as file:
file.write(downed_service_message)
message = MIMEText("""Hello,
While performing automated checks, the following services were detected
to be in incorrect states:
%s
Cheers,
check.py robot
""" % downed_service_message)
message['Subject'] = subject
message['From'] = sender
message['To'] = recipient
message['X-Mailer'] = 'HTTP Services Checker <me@lewiseason.co.uk>'
s = smtplib.SMTP(mailserver)
s.sendmail(sender, [recipient], message.as_string())
s.quit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment