Create a gist now

Instantly share code, notes, and snippets.

@eight /htpchk.conf
Last active Sep 4, 2015

What would you like to do?
[test_HEAD]
url: http://example.com/
[test_GET]
url: http://example.com/
method: GET
[test_POST]
url: http://example.com/
method: POST
data: Hello World
[test_notfound]
url: http://example.com/notfound.html
notify_interval: 10
[DEFAULT]
#************************************************
; DEFAULT values
#************************************************
#Target URL.
#url=http://example.com/
#HTTP method
method: HEAD
#POST data
#data: Hello World
#When HTTP error occuered, repeatedly notified with interval seconds.
notify_interval: 600
#after checking, wait bellow seconds.
wait_seconds: 0
#tcp connection timeout seconds.
tcp_timeout = 10
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""#8 Simple HTTP Checker
Usage:
htpchk (htpchk.conf is needed at same directory.)
htpchk URL
htpchk config-file
"""
__NAME__ = '#8 Simple HTTP Checker'
__VERSION__ = '1.2'
__ABOUT__ = 'http://jinim.jp/archives/2136'
__USER_AGENT__ = 'Mozilla/5.0 (compatible; %s/%s; +%s)' % (__NAME__, __VERSION__, __ABOUT__)
DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
import os
import sys
import time
import socket
import urllib2
from urllib2 import URLError, HTTPError
from ssl import SSLError
from socket import timeout as SocketTimeout
import urlparse
import tempfile
from ConfigParser import ConfigParser
#url to filename with sha hash.
try:
import hashlib
def _urlhash_filename(url): return hashlib.sha224(url).hexdigest()
except:
import sha
def _urlhash_filename(url): return sha.new(url).hexdigest()
opener = urllib2.build_opener()
opener.addheaders = [('User-agent', __USER_AGENT__)]
urllib2.install_opener(opener)
class ParamError(Exception): pass
class HeadRequest(urllib2.Request):
def get_method(self): return "HEAD"
class Site(object):
def __init__(self, name, url, notify_interval=60*10, method="HEAD", data=None, wait_seconds=0, tcp_timeout=3):
"""When medhod is POST, data is requred."""
self.name = name
self.url = url
self.method = method
self.data = data
self.notify_interval = notify_interval
self.wait_seconds = wait_seconds
#minimum timeout is 1sec.
if tcp_timeout<1: tcp_timeout = 1
self.tcp_timeout = tcp_timeout
self.notified = os.path.join(tempfile.gettempdir(), "htpcheck-%s" % _urlhash_filename(self.url))
def check(self):
socket.setdefaulttimeout(self.tcp_timeout)
if self.method=="HEAD":
u = urllib2.urlopen(HeadRequest(self.url))
elif self.method=="POST":
u = urllib2.urlopen(self.url, data=self.data)
elif self.method=="GET":
u = urllib2.urlopen(self.url)
else:
raise ParamError, "method: %s not supported. Supported methods are (HEAD,GET,POST)." % self.method
return u.info()
def recovered(self):
os.rename(self.notified, os.path.join(os.path.dirname(self.notified), os.path.basename(self.notified) + time.strftime("-recovered.%Y%m%d%H%M%S")))
def notify(self, err):
msg = log_message(self, err)
last_notified = 0
if os.path.exists(self.notified):
try:
last_notified = time.mktime(time.strptime(file(self.notified).readlines()[-1].split("\t")[0], DATETIME_FORMAT))
except ValueError:
last_notified = 0
if time.time() > (last_notified + self.notify_interval):
sys.stderr.write(msg)
file(self.notified, 'a').write("%s\tnotified\n" % msg[:-1])
else:
sys.stdout.write(log_message(self, "[SUPPRESSED] %s" % err))
def log_message(site, msg=None):
"""if msg is not None, message means error.
"""
if msg:
format = '%s\tNG\t%s\t%s\t%s\t%s\n'
return format % (time.strftime(DATETIME_FORMAT), site.name, site.method, site.url, msg)
else:
format = '%s\tOK\t%s\t%s\t%s\n'
return format % (time.strftime(DATETIME_FORMAT), site.name, site.method, site.url)
def config_parse(conf):
parser = ConfigParser()
parser.readfp(open(conf))
sites = list()
for section in parser.sections():
url = parser.get(section, 'url')
method = parser.get(section, 'method')
if method=="POST":
data = parser.get(section, 'data')
else:
data = None
notify_interval = parser.getfloat(section, 'notify_interval')
wait_seconds = parser.getfloat(section, 'wait_seconds')
tcp_timeout = parser.getint(section, 'tcp_timeout')
sites.append(Site(section, url, notify_interval, method, data, wait_seconds, tcp_timeout))
return sites
def main(sites):
for site in sites:
try:
site.check()
sys.stdout.write(log_message(site))
if os.path.exists(site.notified):
site.recovered()
except (SocketTimeout, SSLError, URLError, HTTPError), err:
site.notify(err)
if site.wait_seconds>0: time.sleep(site.wait_seconds)
if __name__=='__main__':
if len(sys.argv)>1: config = sys.argv[1]
else: config = os.path.splitext(__file__)[0]+'.conf'
if not os.path.exists(config):
if config.startswith('http://') or config.startswith('https://'):
name = urlparse.urlparse(config)[1].split(':')[0]
sites = (Site(name, config), )
else:
raise ParamError, "config file %s not found." % config
else:
sites = config_parse(config)
main(sites)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment