Created
August 25, 2014 05:55
-
-
Save t3rmin4t0r/accc80d16e28d2cfd5bc to your computer and use it in GitHub Desktop.
yarn top csv generator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import re,math,string | |
from time import sleep | |
import os.path as path | |
import urllib2 | |
from json import loads as json_parse | |
from collections import defaultdict | |
import getopt | |
import datetime | |
import time | |
from urlparse import urljoin, urlparse, urlunparse | |
import csv | |
try: | |
from urllib.request import urlopen | |
except: | |
from urllib2 import urlopen as urlopen | |
unix_timestamp = time.time | |
class ApplicationAllocation(object): | |
def __init__(self): | |
self.application_id = None | |
self.allocations = defaultdict(lambda : (0, 0)) | |
def update(self, ts, allocations): | |
self.allocations[ts] = allocations | |
def get(self, ts): | |
return self.allocations[ts] | |
class AllocationHistory(object): | |
def __init__(self, ws_url): | |
ws_tuple = tuple(urlparse(ws_url)) | |
ws_tuple = ws_tuple[:2] + ("/ws/v1/cluster/apps?states=running",) + ws_tuple[3:] | |
self.ws_url = urlunparse(ws_tuple) | |
self.apps = defaultdict(ApplicationAllocation) | |
self.timespans = set() | |
def process(self, ts, a): | |
appid = a["id"] | |
vcores = a["allocatedVCores"] | |
mem = a['allocatedMB'] | |
alloc = self.apps[appid] | |
if not alloc.application_id: | |
alloc.application_id = appid | |
alloc.update(ts, (vcores, mem)) | |
def collect(self): | |
ts = unix_timestamp() | |
self.timespans.add(ts) | |
raw = json_parse(urlopen(self.ws_url).read()) | |
apps = raw["apps"] | |
if not apps: | |
return | |
map(lambda a : self.process(ts, a), apps["app"]) | |
def output(self): | |
apps = list(sorted(self.apps.keys())) | |
columns = ["timestamp"] + apps | |
mem = [] | |
vcores = [] | |
for ts in sorted(self.timespans): | |
mem_row = [ts] | |
vcore_row = [ts] | |
for app in apps: | |
(c, m) = self.apps.get(app).get(ts) | |
vcore_row.append(c) | |
mem_row.append(m) | |
mem.append(mem_row) | |
vcores.append(vcore_row) | |
w = csv.writer(sys.stdout) | |
w.writerow(columns) | |
w.writerows(mem) | |
#w.close() | |
def usage(): | |
print "yarn-top-csv.py http://rm-host:8088/" | |
def main(args): | |
opts, args=getopt.getopt(args, '', []) | |
# print opts | |
if not args: | |
return usage() | |
ws_url = args[0] | |
history = AllocationHistory(ws_url) | |
try: | |
while True: | |
history.collect() | |
sleep(1.0) | |
except KeyboardInterrupt, SystemExit: | |
history.output() | |
except: | |
raise | |
if __name__ == "__main__": | |
sys.exit(main(sys.argv[1:]) or 0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment