Skip to content

Instantly share code, notes, and snippets.

@bossjones
Forked from laughingman7743/emr.py
Created April 30, 2019 19:43
Show Gist options
  • Save bossjones/37516cec047a2e2d17ac971a9b8e9238 to your computer and use it in GitHub Desktop.
Save bossjones/37516cec047a2e2d17ac971a9b8e9238 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import boto3
SCRIPT_RUNNER_JAR = 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
BASE_ARGS_HIVE_STEP = [
'/usr/share/aws/emr/scripts/hive-script',
'--run-hive-script',
'--args',
'-f',
]
def create_cluster(region_name,
name,
log_uri,
release_label,
instance_type_master,
instance_count_master,
instance_type_slave,
instance_count_slave,
key_name,
subnet_id,
service_access_security_group,
master_security_group,
master_additional_security_group,
slave_security_group,
slave_additional_security_group,
hive_metastore_url,
hive_metastore_user,
hive_metastore_password,
bucket_name,
target_date):
client = boto3.client('emr', region_name=region_name)
response = client.run_job_flow(
Name=name,
LogUri=log_uri,
ReleaseLabel=release_label,
Instances={
'InstanceGroups': [
{
'Name': 'emr-master',
'InstanceRole': 'MASTER',
'InstanceType': instance_type_master,
'InstanceCount': instance_count_master,
},
{
'Name': 'emr-core',
'InstanceRole': 'CORE',
'InstanceType': instance_type_slave,
'InstanceCount': instance_count_slave,
},
],
'Ec2KeyName': key_name,
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2SubnetId': subnet_id,
'ServiceAccessSecurityGroup': service_access_security_group,
'EmrManagedMasterSecurityGroup': master_security_group,
'AdditionalMasterSecurityGroups': master_additional_security_group,
'EmrManagedSlaveSecurityGroup': slave_security_group,
'AdditionalSlaveSecurityGroups': slave_additional_security_group,
},
Steps=[],
Applications=[
{
'Name': 'Hadoop'
},
{
'Name': 'Hive'
},
],
Configurations=[
{
'Classification': 'hive-site',
'Properties': {
'javax.jdo.option.ConnectionURL': hive_metastore_url,
'javax.jdo.option.ConnectionUserName': hive_metastore_user,
'javax.jdo.option.ConnectionPassword': hive_metastore_password
}
},
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
)
client.add_job_flow_steps(
JobFlowId=response['JobFlowId'],
Steps=get_dist_cp_step(bucket_name, target_date),
)
client.add_job_flow_steps(
JobFlowId=response['JobFlowId'],
Steps=get_hive_script_step(bucket_name, target_date),
)
return response['JobFlowId']
def get_dist_cp_step(bucket_name, target_date):
return [{
'Name': 'S3DistCp step',
'ActionOnFailure': 'CANCEL_AND_WAIT',#'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE',
'HadoopJarStep': {
'Jar': '/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar',
'Args': [
'--src',
's3://{0}/path/to/raw/log/{1}/'.format(
bucket_name, target_date.strftime('%Y/%m/%d')),
'--dest',
's3://{0}/path/to/merge/log/{1}/'.format(
bucket_name, target_date.strftime('year=%Y/month=%m/day=%d')),
'--groupBy',
"(.)*",
'--targetSize',
'2048',
]
}
}]
def get_hive_script_step(bucket_name, target_date):
return [{
'Name': 'Hive script step',
'ActionOnFailure': 'CANCEL_AND_WAIT',#'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE',
'HadoopJarStep': {
'Jar': SCRIPT_RUNNER_JAR,
'Args': BASE_ARGS_HIVE_STEP + [
's3://{0}/path/to/hive-script/script.hql'.format(bucket_name),
'-d',
'YEAR={0}'.format(target_date.year),
'-d',
'MONTH={0:02d}'.format(target_date.month),
'-d',
'DAY={0:02d}'.format(target_date.day),
]
}
}]
def check_cluster_status(region_name, jobflow_id, created_after):
client = boto3.client('emr', region_name=region_name)
response = client.list_clusters(
CreatedAfter=created_after,
)
for cluster in response['Clusters']:
if cluster['Id'] == jobflow_id:
if(cluster['Status']['StateChangeReason'].has_key('Message')):
return (cluster['Status']['State'], cluster['Status']['StateChangeReason']['Message'])
else:
return (cluster['Status']['State'], None)
return None
if __name__ == '__main__':
import setting
import sys
import time
from datetime import datetime, date, timedelta
create_date = datetime.now() - timedelta(days=1)
jobflow_id = create_cluster(setting.REGION_NAME,
setting.CLUSTER_NAME,
setting.LOG_URI,
setting.RELEASE_LABEL,
setting.INSTANCE_TYPE_MASTER,
setting.INSTANCE_COUNT_MASTER,
setting.INSTANCE_TYPE_SLAVE,
setting.INSTANCE_COUNT_SLAVE,
setting.KEY_NAME,
setting.SUBNET_ID,
setting.SECURITY_GROUP_SERVICE_ACCESS,
setting.SECURITY_GROUP_MASTER,
setting.SECURITY_GROUP_MASTER_ADDITIONAL,
setting.SECURITY_GROUP_SLAVE,
setting.SECURITY_GROUP_SLAVE_ADDITIONAL,
setting.HIVE_METASTORE_URL,
setting.HIVE_METASTORE_USER,
setting.HIVE_METASTORE_PASSWORD,
setting.BUCKET_NAME,
date(2016, 2, 11))
while True:
status = check_cluster_status(setting.REGION_NAME,
jobflow_id,
create_date)
if status and status[0] == 'TERMINATED':
if status[1] == 'Steps completed':
print('complete')
break
else:
print('error')
sys.exit(1)
else:
time.sleep(60)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
REGION_NAME = 'ap-northeast-1'
CLUSTER_NAME = 'Cluster Name'
LOG_URI = 's3://path/to/emr/log/'
KEY_NAME = 'YOUR_KEY_NAME'
RELEASE_LABEL = 'emr-4.2.0'
INSTANCE_TYPE_MASTER = 'm1.medium'
INSTANCE_COUNT_MASTER = 1
INSTANCE_TYPE_SLAVE = 'm1.medium'
INSTANCE_COUNT_SLAVE = 1
SUBNET_ID = 'YOUR_SUBNET_ID'
SECURITY_GROUP_SERVICE_ACCESS = 'YOUR_SECURITY_GROUP'
SECURITY_GROUP_MASTER = 'YOUR_SECURITY_GROUP'
SECURITY_GROUP_MASTER_ADDITIONAL = ['YOUR_SECURITY_GROUP']
SECURITY_GROUP_SLAVE = 'YOUR_SECURITY_GROUP'
SECURITY_GROUP_SLAVE_ADDITIONAL = ['YOUR_SECURITY_GROUP']
HIVE_METASTORE_URL = 'jdbc:mysql://YOUR_RDS_URL/hive?createDatabaseIfNotExist=true'
HIVE_METASTORE_USER = 'YOUR_RDS_USER'
HIVE_METASTORE_PASSWORD = 'YOUR_RDS_PASSWORD'
BUCKET_NAME = 'YOUR_BUCKET_NAME'
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment