Skip to content

Instantly share code, notes, and snippets.

@hey-jude
Last active October 23, 2020 12:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hey-jude/986e4809f054ef43517bf4dcc410f01c to your computer and use it in GitHub Desktop.
Save hey-jude/986e4809f054ef43517bf4dcc410f01c to your computer and use it in GitHub Desktop.
Export yarn application resource usage to influxdb
import os
import sys
import subprocess
import json
import requests
import logging
from influxdb import InfluxDBClient
from influxdb.client import InfluxDBClientError
host=''
port=
user = ''
password = ''
dbname = ''
client = InfluxDBClient(host,port,user,password,dbname)
yarn_result = subprocess.check_output(["/app/yarn/bin/yarn application -list 2>/dev/null | cut -f1 | egrep '^application_'"], shell=True)
yarn_result = yarn_result.split('\n') ## Change Str to List
yarn_result.remove('') ## Delete 'null'
result = []
for i in yarn_result:
j = subprocess.check_output(["/app/yarn/bin/yarn application -status {} 2>/dev/null | egrep 'Application-Name|Application-Type|User|Queue|Start-Time|Aggregate Resource Allocation'".format(i)], shell=True)
j = [x.strip().replace(" : "," ; ").replace(":","_").replace(" ; "," : ") for x in j.split('\n')]
j.remove('')
r = {'Application-Id':i}
for k in j:
if k.startswith('Aggregate Resource Allocation'):
# ['Aggregate Resource Allocation', '361298757 MB-seconds, 69404 vcore-seconds']
l = [y.split(' ') for y in [x.strip() for x in k.split(':')][1].split(', ')]
#print l
r['Resource-memMB'] = l[0][0]
r['Resource-vcore'] = l[1][0]
else:
r.update(dict([[x.strip() for x in k.split(':')]]))
#r['ApplicationId'] = r['Application-Id']
r['AppId'] = 'app_' + r['Application-Id'].split('_')[2]
r.pop('Application-Id')
r['ApplicationType'] = r['Application-Type']
r.pop('Application-Type')
if r['Application-Name'].startswith('HIVE-'):
r['ApplicationName'] = 'HIVE'
elif r['Application-Name'].startswith('oozie_launcher'):
r['ApplicationName'] = 'oozie_launcher'
elif r['Application-Name'].startswith('oozie_action'):
r['ApplicationName'] = 'oozie_action'
elif r['Application-Name'].startswith('oozie'):
r['ApplicationName'] = 'oozie'
elif r['Application-Name'].startswith('streamjob'):
r['ApplicationName'] = 'streamjob.jar'
elif '-Optional' in r['Application-Name']:
r['ApplicationName'] = r['Application-Name'].split('-Optional')[0]
elif '2019' in r['Application-Name']:
r['ApplicationName'] = r['Application-Name'].split('2019')[0]
else:
r['ApplicationName'] = " ".join(unicode(r['Application-Name'], 'utf-8').split())[:45]
r.pop('Application-Name')
r.pop('Start-Time')
try :
f = {}
f['Resource-memMB'] = int(r.pop('Resource-memMB'))
f['Resource-vcore'] = int(r.pop('Resource-vcore'))
json_body = {
"measurement" : "yarn_res",
"fields" : f,
"tags" : r
}
if len(sys.argv) >= 2:
import json
print json.dumps([json_body])
else:
client.write_points([json_body])
except requests.exceptions.ConnectionError as errc:
logging.error(errc)
pass
except InfluxDBClientError as erri:
logging.error(erri)
pass
except UnicodeDecodeError as erru:
logging.error(erru)
logging.error(json_body)
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment