Skip to content

Instantly share code, notes, and snippets.

@conradlee
Created January 30, 2019 09:46
Show Gist options
  • Save conradlee/11996418691e9272c181fe21bed8712c to your computer and use it in GitHub Desktop.
Save conradlee/11996418691e9272c181fe21bed8712c to your computer and use it in GitHub Desktop.
A python script that kills any YARN applications on EMR that run for too long
# Copyright 2019 Parsely
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Author: Conrad Lee <conrad@parsely.com>
import json
import subprocess
import time
import urllib
from xml.etree import ElementTree
import yarn_api_client
MAX_JOB_DURATION_SECONDS = 60 * 60 * 3
POLLING_INTERVAL_SECONDS = 30
def kill_yarn_app(app_id):
command_args = ["/usr/bin/yarn", 'application', '-kill', app_id]
subprocess.run(command_args, check=True)
def get_resource_manager_addr(yarn_conf_path="/etc/hadoop/conf/yarn-site.xml"):
dom = ElementTree.parse(yarn_conf_path)
address = None
settings = dom.findall('property')
for setting in settings:
if setting.find('name').text == "yarn.resourcemanager.address":
address = setting.find('value').text
if address is None:
return None, None
else:
host, port = address.split(":")
return host, int(port)
def get_longest_running_yarn_app(host, port=8088):
rm = yarn_api_client.resource_manager.ResourceManager(address=host, port=port)
apps_data = rm.cluster_applications(state="RUNNING").data #['apps']['app']
if apps_data['apps'] is None:
return None
running_apps = apps_data['apps']['app']
running_apps.sort(key=lambda x: x['elapsedTime'], reverse=True)
return running_apps[0]
def get_ec2_instance_id():
return urllib.request.urlopen('http://169.254.169.254/latest/meta-data/instance-id').read().decode()
def current_node_master():
"""Returns boolean indicating whether the node on which this
function is executed the EMR master node."""
current_instance_id = get_ec2_instance_id()
master_instance_id = get_cluster_config()['masterInstanceId']
return current_instance_id == master_instance_id
def get_cluster_config():
with open("/mnt/var/lib/info/job-flow.json") as configfile:
config = json.loads(configfile.read().strip())
return config
if __name__ == "__main__":
if current_node_master():
host, port = get_resource_manager_addr()
while True:
app = get_longest_running_yarn_app(host)
if app is not None:
elapsed_time_seconds = app['elapsedTime'] / 1000
if elapsed_time_seconds > MAX_JOB_DURATION_SECONDS:
print("Longest app duration is {} so terminating it.".format(elapsed_time_seconds))
kill_yarn_app(app['id'])
else:
print("Longest app duration is {} so not terminating anything".format(elapsed_time_seconds))
else:
print("No yarn apps currently running.")
time.sleep(POLLING_INTERVAL_SECONDS)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment