Skip to content

Instantly share code, notes, and snippets.

@conradlee
Created January 30, 2019 14:04
Show Gist options
  • Save conradlee/6abcaceb2dc21ab48d613d7befa80c4f to your computer and use it in GitHub Desktop.
Save conradlee/6abcaceb2dc21ab48d613d7befa80c4f to your computer and use it in GitHub Desktop.
Kill yarn apps on EMR clusters that take longer than a specified amount of time
# Copyright 2019 Parsely
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Author: Conrad Lee <conrad@parsely.com>
# Consider running this via cron
import json
import os
import subprocess
import urllib
from xml.etree import ElementTree
import yarn_api_client
MAX_JOB_DURATION_SECONDS = 60 * 60 * 3 # How long a YARN app can live before its killed
def kill_yarn_app(app_id):
command_args = ["/usr/bin/yarn", "application", "-kill", app_id]
subprocess.run(command_args, check=True)
def get_resource_manager_addr(yarn_conf_path="/etc/hadoop/conf/yarn-site.xml"):
# First wait for the yarn conf file
if not os.path.isfile(yarn_conf_path):
return None
dom = ElementTree.parse(yarn_conf_path)
address = None
settings = dom.findall("property")
for setting in settings:
if setting.find("name").text == "yarn.resourcemanager.address":
address = setting.find("value").text
if address is None:
return None
else:
host, port = address.split(":")
return host
def get_longest_running_yarn_app(host, port=8088):
rm = yarn_api_client.resource_manager.ResourceManager(address=host, port=port)
apps_data = rm.cluster_applications(state="RUNNING").data # ['apps']['app']
if apps_data["apps"] is None:
return None
running_apps = apps_data["apps"]["app"]
running_apps.sort(key=lambda x: x["elapsedTime"], reverse=True)
return running_apps[0]
def get_instance_id(aws_endpoint="http://169.254.169.254/latest/meta-data/instance-id"):
return urllib.request.urlopen(aws_endpoint).read().decode()
def current_node_master():
"""Returns boolean indicating whether the node on which this
function is executed the EMR master node."""
current_instance_id = get_instance_id()
master_instance_id = get_cluster_config()["masterInstanceId"]
return current_instance_id == master_instance_id
def get_cluster_config():
with open("/mnt/var/lib/info/job-flow.json") as configfile:
config = json.loads(configfile.read().strip())
return config
if __name__ == "__main__":
if current_node_master():
host = get_resource_manager_addr()
app = get_longest_running_yarn_app(host)
if app is not None:
elapsed_time_seconds = app["elapsedTime"] / 1000
if elapsed_time_seconds > MAX_JOB_DURATION_SECONDS:
kill_yarn_app(app["id"])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment