Skip to content

Instantly share code, notes, and snippets.

@fabrizioc1
Created August 10, 2018 18:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fabrizioc1/ac219511489088c48aba944ee4e98203 to your computer and use it in GitHub Desktop.
Save fabrizioc1/ac219511489088c48aba944ee4e98203 to your computer and use it in GitHub Desktop.
Execute script in each EMR cluster node
import os
import boto3
import subprocess
import configargparse
SSH_USER = 'hadoop'
SSH_TEMPLATE = 'ssh -o StrictHostKeyChecking=no -i {ssh_key} {ssh_user}@{host} "{cmd}"'
SSH_KEY_PATH = os.path.expanduser('~/.ssh')
def find_ssh_key(emr, config):
if config.ssh_key:
ssh_key = config.ssh_key
else:
response = emr.describe_cluster(ClusterId=config.cluster_id)
key_name = response['Cluster']['Ec2InstanceAttributes']['Ec2KeyName']
ssh_key = os.path.join(SSH_KEY_PATH, key_name+".pem")
return ssh_key
def find_ip_addresses(emr, config):
response = emr.list_instances(ClusterId=config.cluster_id)
ip_addresses = [instance['PrivateIpAddress'] for instance in response['Instances'] if instance['Status']['State'] == 'RUNNING']
return ip_addresses
def execute_script(ssh_cmd):
try:
ssh_output = subprocess.check_output(ssh_cmd, shell=True)
print("stdout:")
print(ssh_output)
except subprocess.CalledProcessError as e:
ssh_output = e.output
print("stderr:")
print(ssh_output)
if __name__ == "__main__":
config_parser = configargparse.ArgParser()
config_parser.add('--cluster-id', required=True, help='EMR cluster id')
config_parser.add('--script', required=True, help='shell script to execute')
config_parser.add('--ssh-key', required=False, help='EMR cluster ssh key')
config_parser.add('--ssh-user', default=SSH_USER, help='EMR cluster ssh user')
config_parser.add('--debug', action='store_true', help='debugging flag')
config_parser.add('--region', default='us-west-1', help='AWS region')
config = config_parser.parse_args()
emr = boto3.client('emr', region_name=config.region)
ssh_key = find_ssh_key(emr, config)
ip_addresses = find_ip_addresses(emr, config)
for ip_address in ip_addresses:
ssh_cmd = SSH_TEMPLATE.format(ssh_key=ssh_key, ssh_user=config.ssh_user, host=ip_address, cmd=config.script)
print("{ip_address}:".format(ip_address=ip_address))
execute_script(ssh_cmd)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment