Skip to content

Instantly share code, notes, and snippets.

@edma2
Created April 27, 2012 07:33
Show Gist options
  • Save edma2/2506971 to your computer and use it in GitHub Desktop.
Save edma2/2506971 to your computer and use it in GitHub Desktop.
# beaglenmt diagnostics web server - Vanya Sergeev <vsergeev at gmail.com>
# http://dev.frozeneskimo.com/embedded_projects/beaglebone_network_multitool_server
#
import cherrypy
import os, subprocess, time, shlex, threading, copy
FORTUNE_BIN_PATH = "/home/edma2/minifortune"
FORTUNE_DIR_PATH = "/home/edma2/fortunes"
STATS_UPDATE_INTERVAL = 3
STATS_INTERFACES = ["eth0", "tun6in4"]
uname = subprocess.check_output(["uname", "-msnro"])
class ExpireDict(dict):
"""A thread-safe dictionary with items that expire after a timeout."""
def __init__(self, timeout):
self.times = {}
self.timeout = timeout
self.lock = threading.Lock()
def __setitem__(self, key, value):
self.lock.acquire()
self.times[key] = time.time()
super(ExpireDict, self).__setitem__(key, value)
self.lock.release()
def __contains__(self, item):
if super(ExpireDict, self).__contains__(item):
return self.expired(item)
def expired(self, item):
return time.time() - self.times[item] < self.timeout
def cached(timeout):
cache = ExpireDict(timeout)
def wrapper(func):
def wrapped(*args):
if args not in cache:
cache[args] = func(*args)
return cache[args]
return wrapped
return wrapper
# Decorator to wrap try/except blocks with an error return
def try_except(err):
def _try_except(func):
def wrapped(*args, **kwargs):
try: return func(*args, **kwargs)
except: return err
return wrapped
return _try_except
@try_except("Error fetching fortune!\n")
def fetch_fortune():
# Look up a fortune with minifortune
return subprocess.check_output([FORTUNE_BIN_PATH, FORTUNE_DIR_PATH], stderr=subprocess.STDOUT)
@try_except((["Error looking up IPv4 Addresses!"], ["Error looking up IPv6 Addresses!"]))
def fetch_ip():
# Look up our IPv4 and Global Scope IPv6 Addresses with 'ip'
ipv4_addresses = []
ipv6_addresses = []
for iface in STATS_INTERFACES:
try: addrinfo = [shlex.split(a) for a in subprocess.check_output(["ip", "addr", "show", "dev", iface], stderr=subprocess.STDOUT).split("\n")]
except subprocess.CalledProcessError: continue
addrinfo = filter(lambda l: len(l) > 2, addrinfo)
for ai in addrinfo:
if ai[0] == "inet": ipv4_addresses.append(ai[1])
if ai[0] == "inet6" and ai[1][0:4] == "2001": ipv6_addresses.append(ai[1])
return (ipv4_addresses, ipv6_addresses)
@try_except("Error processing /proc/uptime!")
def fetch_uptime():
# Process /proc/uptime
proc_uptime_f = open('/proc/uptime', 'r')
uptime_sec = float(proc_uptime_f.read().split(' ')[0])
proc_uptime_f.close()
uptime_min, uptime_sec = divmod(uptime_sec, 60)
uptime_hour, uptime_min = divmod(uptime_min, 60)
uptime_day, uptime_hour = divmod(uptime_hour, 24)
uptime_stat = "%d days, %02d:%02d:%02d h:m:s" % (uptime_day, uptime_hour, uptime_min, uptime_sec)
return uptime_stat
@try_except(["Error processing /proc/net/dev!"])
def fetch_netrxtx():
# Process /proc/net/dev
proc_netdev_f = open('/proc/net/dev', 'r')
netrxtx_stats = [ shlex.split(s) for s in proc_netdev_f.read().split('\n') ]
netrxtx_stats = filter(lambda l: len(l) > 9, netrxtx_stats)
proc_netdev_f.close()
netdev_txrx_count = {}
for line in netrxtx_stats:
for iface in STATS_INTERFACES:
if iface in line[0]: netdev_txrx_count[iface] = (int(line[1]), int(line[9]))
netrxtx_stat = []
for iface in netdev_txrx_count:
netrxtx_stat.append("%s: %.2f MB received, %.2f MB transmitted" % (iface, netdev_txrx_count[iface][0]/1048576.0, netdev_txrx_count[iface][1]/1048576.0))
if len(netrxtx_stat) == 0:
netrxtx_stat = ["Error processing /proc/net/dev!"]
return netrxtx_stat
@try_except("Error processing /proc/loadavg!")
def fetch_loadavg():
# Process /proc/loadavg
proc_loadavg_f = open('/proc/loadavg', 'r')
loadavg_stat = ' '.join(proc_loadavg_f.read().split(' ')[0:3])
proc_loadavg_f.close()
return loadavg_stat
@try_except("Error processing /proc/meminfo!")
def fetch_memfree():
# Process /proc/meminfo
proc_meminfo_f = open('/proc/meminfo', 'r')
memfree_stat = shlex.split(proc_meminfo_f.read())
memfree_stat = "%s %s / %s %s = %.2f%%" % (memfree_stat[4], memfree_stat[5], memfree_stat[1], memfree_stat[2], (float(memfree_stat[4])/float(memfree_stat[1]))*100.0)
proc_meminfo_f.close()
return memfree_stat
@cached(STATS_UPDATE_INTERVAL)
def get():
# Fortune for our IPv6 guests
fortune_new = fetch_fortune()
# IPv4 and Global Scope IPv6 Addresses
(ipv4_addresses, ipv6_addresses) = fetch_ip()
# Uptime
uptime_stat = fetch_uptime()
# Network RX/TX counters
netrxtx_stat = fetch_netrxtx()
# Load Averages
loadavg_stat = fetch_loadavg()
# Free Memory
memfree_stat = fetch_memfree()
# Timestamp this update
last_updated = time.asctime(time.localtime())
# Format the stats results
stats_new = "IPv4 Addresses: "
if len(ipv4_addresses) == 0: stats_new += "\n"
for i in range(len(ipv4_addresses)): stats_new += " "*(i > 0) + ipv4_addresses[i] + "\n"
stats_new += "IPv6 Addresses: "
if len(ipv6_addresses) == 0: stats_new += "\n"
for i in range(len(ipv6_addresses)): stats_new += " "*(i > 0) + ipv6_addresses[i] + "\n"
stats_new += "\n"
stats_new += "Machine/OS: " + uname + "\n"
stats_new += "Uptime: " + uptime_stat + "\n"
stats_new += "Network: "
if len(netrxtx_stat) == 0: stats_new += "\n"
for i in range(len(netrxtx_stat)): stats_new += " "*(i > 0) + netrxtx_stat[i] + "\n"
stats_new += "Load Avg: " + loadavg_stat + "\n"
stats_new += "Mem Free: " + memfree_stat + "\n"
stats_new += "Last Updated: " + last_updated + "\n"
stats_new += "\n"
return (stats_new, fortune_new)
class Root:
RESPONSE_HEAD = "<html><head><title>beaglenmt</title><style type=\"text/css\">body { background: #1f1f1f; } a { text-decoration: none; } a:link, a:visited { color: #e3ceab; } a:hover { color: #8cd0d3; } #main { background: #3f3f3f; color: #dcdcdc; margin: 50px auto; padding: 15px; width: 600px; font-size: 13px; font-family: 'Helvetica Neue', Helvetica, Arial, Geneva, sans-serif; -webkit-border-radius: 15px; -moz-border-radius: 15px; border-radius: 15px; } #main pre { white-space: pre-wrap; margin: 15px 15px 0 15px; font-family: monospace; font-size: 12px; } #main h3 { margin: 0; text-align: center; line-height: 30px; border-bottom: 1px solid #6c6c6c; font-size: 15px; }</style></head><body><div id=\"main\">\n"
RESPONSE_FOOT = "Powered by: <a href=\"http://beagleboard.org/bone\">beaglebone</a>, <a href=\"http://buildroot.org/\">buildroot</a>, <a href=\"http://www.cherrypy.org/\">cherrypy</a>, <a href=\"https://github.com/vsergeev/minifortune\">minifortune</a>, <a href=\"https://gist.github.com/2498485\">beaglenmt_httpd.py</a>, <a href=\"http://freedns.afraid.org/\">FreeDNS</a>\n</div></body></html>"
def build_response_ipv6(self):
(latest_stats, latest_fortune) = get()
response = self.RESPONSE_HEAD
response += "<h3>Welcome to the <a href=\"http://dev.frozeneskimo.com/embedded_projects/beaglebone_network_multitool_server\">BeagleBone Network Multitool Server</a> via IPv6!<br/></h3>"
response += "<pre>\n" + latest_stats + "</pre>\nYou connected via IPv6, you get a fortune!<br/>\n<pre>\n" + latest_fortune + "</pre><br/>\n"
response += self.RESPONSE_FOOT
return response
def build_response_ipv4(self):
(latest_stats, latest_fortune) = get()
response = self.RESPONSE_HEAD
response += "<h3>Welcome to the <a href=\"http://dev.frozeneskimo.com/embedded_projects/beaglebone_network_multitool_server\">BeagleBone Network Multitool Server</a> via IPv4!<br/></h3>"
response += "<pre>\n" + latest_stats + "</pre>\nYou connected via IPv4, you get a fortune too!<br/>\n<pre>\n" + latest_fortune + "</pre><br/>\n"
response += self.RESPONSE_FOOT
return response
def index(self):
# Check if this was an IPv4-mapped-to-IPv6 address
if cherrypy.request.remote.ip[0:6] == "::ffff":
return self.build_response_ipv4()
return self.build_response_ipv6()
index.exposed = True
cherrypy.config.update({
'server.socket_host': '::',
'server.socket_port': 8080,
'server.thread_pool': 20,
'server.socket_queue_size': 15,
'log.access_file': "/tmp/beaglenmt_httpd_%d_access.log" % os.getpid(),
'log.error_file': "/tmp/beaglenmt_httpd_%d_error.log" % os.getpid(),
'log.screen': False,
'environment': 'production',
})
#cherrypy.process.plugins.PIDFile(cherrypy.engine, '/tmp/beaglenmt_httpd.pid').subscribe()
#cherrypy.process.plugins.DropPrivileges(cherrypy.engine, umask=0644, uid=99, gid=99).subscribe()
cherrypy.quickstart(Root(), '/', config = { '/favicon.ico' : { 'tools.staticfile.on': False }})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment