Skip to content

Instantly share code, notes, and snippets.

@gkadillak
Created August 13, 2015 17:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gkadillak/50d3111d1456e69480c4 to your computer and use it in GitHub Desktop.
Save gkadillak/50d3111d1456e69480c4 to your computer and use it in GitHub Desktop.
class MemoryWidgetNamespace(BaseNamespace, BroadcastMixin):
switch = False
def recv_disconnect(self):
self.switch = False
self.disconnect()
def on_send_data(self):
self.spawn(self.send_meminfo_data)
self.switch = True
logger.debug('sending data!')
def send_meminfo_data(self):
logger.debug('meminfo sent')
while self.switch:
stats_file = '/proc/meminfo'
(total, free, buffers, cached, swap_total, swap_free, active, inactive,
dirty,) = (None,) * 9
with open(stats_file) as sfo:
for l in sfo.readlines():
if (re.match('MemTotal:', l) is not None):
total = int(l.split()[1])
elif (re.match('MemFree:', l) is not None):
free = int(l.split()[1])
elif (re.match('Buffers:', l) is not None):
buffers = int(l.split()[1])
elif (re.match('Cached:', l) is not None):
cached = int(l.split()[1])
elif (re.match('SwapTotal:', l) is not None):
swap_total = int(l.split()[1])
elif (re.match('SwapFree:', l) is not None):
swap_free = int(l.split()[1])
elif (re.match('Active:', l) is not None):
active = int(l.split()[1])
elif (re.match('Inactive:', l) is not None):
inactive = int(l.split()[1])
elif (re.match('Dirty:', l) is not None):
dirty = int(l.split()[1])
break # no need to look at lines after dirty.
ts = datetime.utcnow().replace(tzinfo=utc)
self.emit('widgets:memory', {
'key': 'widgets:memory', 'data': {'results': [{
'total': total, 'free': free, 'buffers': buffers,
'cached': cached, 'swap_total': swap_total,
'swap_free': swap_free, 'active': active,
'inactive': inactive, 'dirty': dirty, 'ts': str(ts)
}]
}
})
gevent.sleep(1)
class Application(object):
def __init__(self):
self.buffer = []
def __call__(self, environ, start_response):
path = environ['PATH_INFO'].strip('/') or 'index.html'
if path.startswith('/static') or path == 'index.html':
try:
data = open(path).read()
except Exception:
return not_found(start_response)
if path.endswith(".js"):
content_type = "text/javascript"
elif path.endswith(".css"):
content_type = "text/css"
elif path.endswith(".swf"):
content_type = "application/x-shockwave-flash"
else:
content_type = "text/html"
start_response('200 OK', [('Content-Type', content_type)])
return [data]
if path.startswith("socket.io"):
socketio_manage(environ, {'/memory-widget': MemoryWidgetNamespace})
def not_found(start_response):
start_response('404 Not Found', [])
return ['<h1>Not found</h1>']
def main():
logger.debug('Listening on port http://127.0.0.1:8080 and on port 10843 (flash policy server)')
SocketIOServer(('127.0.0.1', 8001), Application(),
resource="socket.io", policy_server=True).serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment