Skip to content

Instantly share code, notes, and snippets.

@onefoursix
Last active June 13, 2019 02:19
Show Gist options
  • Save onefoursix/b16bbf70afb5419b2fd1486983b02749 to your computer and use it in GitHub Desktop.
Save onefoursix/b16bbf70afb5419b2fd1486983b02749 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
## ********************************************************************************
## mr-usage-by-user.py
##
## Aggregates YARN MapReduce usage by day and user and writes the results to the console and to a file
##
## As the CM-API call "yarn.get_yarn_applications" can only return 1000 jobs max per call the script will make
## multiple calls to yarn.get_yarn_applications and aggregate all results between the script's global start and end times
##
## The time window batch size for the start and end times in the call to yarn.get_yarn_applications is
## set in the variable batch_time_interval and has a default value of 1 hour
## The value should be set to an interval within which fewer than 1000 apps are run
##
## Dependencies: Requires the modules: pytz and tzlocal
## Those modules need to be installed on the machine running the script using commands like:
## $ sudo pip install pytz
## $ sudo pip install tzlocal
##
## Usage: ./mr-usage-by-user.py [<END_DATE> [<NUM_DAYS>]]
##
## Args: END_DATE (optional) - Sets the end date for the YARN history to be reported on. Defaults to the current day
## Date should be formatted as YYYY-mm-dd
## NUM_DAYS (optional) - Set to the number of days of YARN usage to report on. Defaults to 7 days
##
## Examples:
## Report on 7 days activity ending today:
## ./mr-usage-by-user.py
##
## Report on 7 days activity ending 2016-04-01:
## ./mr-usage-by-user.py 2016-04-01
##
## Report on 3 days activity ending 2016-04-01:
## ./mr-usage-by-user.py 2016-04-01 3
##
## Edit the settings below to connect to your Cluster
##
## ********************************************************************************
import sys
from datetime import time, datetime, timedelta
from sets import Set
import pytz
import tzlocal
from cm_api.api_client import ApiResource
## Settings to connect to the cluster
cm_host = "<YOUR CM HOST>"
cm_port = "7180"
cm_login = "admin"
cm_password = "admin"
cluster_name = "<YOUR CLUSTER NAME>"
## I'll hardcode a filename for the report to be written to
filename = "mr-usage-by-user-" + str(datetime.today().date()) + ".csv"
## Needed for python < v2.7
def total_seconds(td):
return long((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6
## Set the appropriate timezone. I will use the local timezone
local_timezone = tzlocal.get_localzone()
## Set to True for verbose output
DEBUG = False
## Check Command Line Args
if len(sys.argv) > 3:
print "Error: Wrong number of arguments"
print "Usage: ./mr-usage-by-user.py [<END_DATE> [<NUM_DAYS>]]"
print " Args: END_DATE (optional) - Sets the end date for the YARN history to be reported on. Defaults to the current day"
print " Date should be formatted as YYYY-mm-dd"
print " NUM_DAYS (optional) - Set to the number of days of YARN usage to report on. Defaults to 7 days"
print ""
print "Examples:"
print " Report on 7 days activity ending today:"
print " ./mr-usage-by-user.py"
print ""
print " Report on 7 days activity ending 2016-04-01:"
print " ./mr-usage-by-user.py 2016-04-01"
print ""
print " Report on 3 days activity ending 2016-04-01:"
print " ./mr-usage-by-user.py 2016-04-01 3"
print "\n\n"
quit(1)
## end_date
end_date = None
if len(sys.argv)> 1:
end_date = datetime.strptime(sys.argv[1], '%Y-%m-%d')
if end_date is None:
end_date = datetime.today().date()
## num_days
num_days = None
if len(sys.argv)> 2:
num_days = int(sys.argv[2])
if num_days is None:
num_days = 7
## Set the start and end times
start_time = datetime.combine(end_date, datetime.min.time()) - timedelta(days = num_days)
end_time = datetime.combine(end_date, datetime.max.time())
if DEBUG:
print "\n\nDEBUG start_time = " + str(start_time)
print "DEBUG end_time = " + str(end_time) + "\n\n"
## Connect to CM
api = ApiResource(server_host=cm_host, server_port=cm_port, username=cm_login, password=cm_password)
## Get the Cluster
cluster = None
clusters = api.get_all_clusters()
for c in clusters:
if c.displayName == cluster_name:
cluster = c
break
if cluster is None:
print "\nError: Cluster '" + cluster_name + "' not found"
quit(1)
print "\n\nConnected to Cloudera Manager on " + cm_host + ":" + cm_port
## Get YARN Service
yarn = None
service_list = cluster.get_all_services()
for service in service_list:
if service.type == "YARN":
yarn = service
break
if yarn is None:
print "Error: Could not locate YARN Service"
quit(1)
print "\nGetting YARN History for Cluster \'" + cluster_name + "\' from " + str(start_time.date()) + " to " + str(end_time.date())
## Create a dictionary to hold all jobs
jobs = {}
## Define a time window for each call to yarn.get_yarn_applications
## The interval should be set so that fewer than 1000 jobs execute within the time window
## I'll hardcode it here for 1 hour
batch_time_interval = timedelta(minutes = 60 * 1)
## We'll keep track of each app we see to avoid dupes
apps_processed = set()
batch_end_time = start_time
while batch_end_time < end_time:
## set the start and end time for each batch
start_time = batch_end_time
batch_end_time = batch_end_time + batch_time_interval
if batch_end_time > end_time:
batch_end_time = end_time
## We'll keep track of the number of successful apps we count per batch
number_of_successful_apps_per_batch = 0
## Get YARN Applications
response = yarn.get_yarn_applications(start_time, batch_end_time, filter_str='', limit=1000, offset=0)
## For each job that has a state of "SUCCEEDED", add the job to the dictionary of jobs per day per user
for app in response.applications:
if app.state == "SUCCEEDED":
## check to see if this app has already been processed
if app.applicationId in apps_processed:
break;
## This is the first time we've seen this app; add it to the processed set
apps_processed.add(app.applicationId)
number_of_successful_apps_per_batch = number_of_successful_apps_per_batch + 1
user = app.user
appId = app.applicationId
appDate = app.startTime.replace(tzinfo=pytz.utc).astimezone(local_timezone).date()
## create a new dictionary of jobs per day
if not jobs.has_key(appDate):
jobs[appDate] = {}
## create a new dictionary of jobs per user
if not jobs[appDate].has_key(user):
jobs[appDate][user] = {}
## create a new dictionary of job attributes for each job
if not jobs[appDate][user].has_key(appId):
jobs[appDate][user][appId] = {}
## Add the job's attributes to each day's user's job's dictionary
jobs[appDate][user][appId]["name"] = app.name
jobs[appDate][user][appId]["pool"] = app.pool
jobs[appDate][user][appId]["startTime"] = app.startTime
jobs[appDate][user][appId]["endTime"] = app.endTime
jobs[appDate][user][appId]["application_duration"] = total_seconds(app.endTime - app.startTime)
jobs[appDate][user][appId]["cpu_milliseconds"] = long(app.attributes["cpu_milliseconds"])
jobs[appDate][user][appId]["physical_memory_bytes"] = long(app.attributes["physical_memory_bytes"])
if DEBUG:
print "\n\n-- DEBUG --------------"
print "adding job to job list for " + str(appDate) + " " + user + " " + appId
print "name: " + app.name
print "pool: " + app.pool
print "startTime: " + str(app.startTime)
print "endTime: " + str(app.endTime)
print "duration: " + str(total_seconds(app.endTime - app.startTime))
print "cpu: " + str(app.attributes["cpu_milliseconds"])
print "memory: " + str(app.attributes["physical_memory_bytes"])
if number_of_successful_apps_per_batch > 0:
print "Retrieved " + str(number_of_successful_apps_per_batch) + " successfully completed apps between " + str(start_time) + " and " + str(batch_end_time)
print "\n\n"
print "Aggregated results by day and user"
print "\n\n"
report_file = open(filename, 'w')
report_file.write("Date,User,#Jobs,Duration(secs),CPU(secs),Memory(MB)\n")
print "Date User #Jobs Duration(secs) CPU(secs) Memory(MB)"
print "--------------------------------------------------------------------------------------"
dates = sorted(jobs.keys())
for the_date in dates:
users = sorted(jobs[the_date].keys())
for the_user in users:
num_jobs = len(jobs[the_date][the_user])
duration = 0
cpu = 0
memory = 0
for the_job in jobs[the_date][the_user].keys():
## aggregate the Duration
duration = duration + jobs[the_date][the_user][the_job]["application_duration"]
## aggregate the CPU
cpu = cpu + jobs[the_date][the_user][the_job]["cpu_milliseconds"]
## aggregate the Memory
memory = memory + jobs[the_date][the_user][the_job]["physical_memory_bytes"]
dateStr = str(the_date)
numJobsStr = ("%0.0f" % num_jobs)
durationStr = ("%0.0f" % (duration)) # round to nearest second
cpuStr = ("%0.0f" % (cpu / 1000)) # round to nearest second
memoryStr = ("%0.0f" % (memory / (1024 * 1024))) # round to MB
report_file.write(dateStr + "," + the_user + "," + numJobsStr + "," + durationStr + "," + cpuStr + "," + memoryStr + "\n")
print dateStr + "\t" + the_user + "\t" + numJobsStr.rjust(10) + "\t" + durationStr.rjust(10) + "\t" + cpuStr.rjust(10) + "\t" + memoryStr.rjust(10)
print "\n\n"
report_file.close()
print "Report output saved to file: " + filename
print "\n\n"
print "Done\n\n\n"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment