Skip to content

Instantly share code, notes, and snippets.

@matthewpick
Last active June 17, 2021 20:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matthewpick/39a4a308b590d29e46c523770a972783 to your computer and use it in GitHub Desktop.
Save matthewpick/39a4a308b590d29e46c523770a972783 to your computer and use it in GitHub Desktop.
EMR Cluster - Quickly open Hadoop UI and Spark UI for all application ids
import logging
import boto3
import webbrowser
logger = logging.getLogger(__name__)
def main():
master_instance_type = 'm5.xlarge'
logger.info("Must be on VPN to view webpages!")
clusters = find_clusters(name='production-streaming-cluster')
for cluster in clusters:
app_ids = find_application_ids(cluster['Id'])
ec2_instances = find_cluster_ec2_instances(cluster['Id'])
primary_instance = [instance for instance in ec2_instances if instance['InstanceType'] == master_instance_type][0]
dns_name = primary_instance['PrivateDnsName']
for app_id in app_ids:
spark_url = f'http://{dns_name}:20888/proxy/{app_id}/streaming/'
hadoop_url = f'http://{dns_name}:8088/cluster/app/{app_id}/'
logger.info("Opening Spark UI %s", spark_url)
logger.info("Opening Hadoop UI %s", hadoop_url)
webbrowser.open(spark_url)
webbrowser.open(hadoop_url)
def find_cluster_ec2_instances(job_flow_id):
client = boto3.client('ec2')
custom_filter = [{
'Name': 'tag:aws:elasticmapreduce:job-flow-id',
'Values': [job_flow_id]}]
response = client.describe_instances(Filters=custom_filter)
# NOTE: May need extra work for non-spot instances! e.g. "Reservations" key is specific to spot instances
for group in response.get('Reservations', []):
for instance in group.get('Instances', []):
yield instance
def find_application_ids(cluster_id):
emr_client = boto3.client('emr')
cluster_details = emr_client.describe_cluster(ClusterId=cluster_id)
s3_path_parts = cluster_details['Cluster']['LogUri'].replace('s3://', '').replace('s3n://', '').split('/')
bucket_name = s3_path_parts[0]
sub_path = "/".join(s3_path_parts[1:-1])
containers_sub_path = f'{sub_path}/{cluster_id}/containers/'
s3_client = boto3.client('s3')
result = s3_client.list_objects(Bucket=bucket_name, Prefix=containers_sub_path, Delimiter='/')
for prefix in result.get('CommonPrefixes', []):
app_id = prefix['Prefix'].split('/')[-2]
yield app_id
def find_clusters(name=None):
client = boto3.client('emr')
clusters = client.list_clusters(ClusterStates=['RUNNING', 'WAITING']).get('Clusters', [])
if not clusters:
logger.info('No Clusters RUNNING')
return None
for cluster in clusters:
if name:
if cluster['Name'] == name:
yield cluster
else:
yield cluster
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment