Skip to content

Instantly share code, notes, and snippets.

@laughingman7743
Last active May 23, 2019 16:17
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save laughingman7743/5c675c9b1d9ed02539e6 to your computer and use it in GitHub Desktop.
Save laughingman7743/5c675c9b1d9ed02539e6 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import boto3
SCRIPT_RUNNER_JAR = 's3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
BASE_ARGS_HIVE_STEP = [
'/usr/share/aws/emr/scripts/hive-script',
'--run-hive-script',
'--args',
'-f',
]
def create_cluster(region_name,
name,
log_uri,
release_label,
instance_type_master,
instance_count_master,
instance_type_slave,
instance_count_slave,
key_name,
subnet_id,
service_access_security_group,
master_security_group,
master_additional_security_group,
slave_security_group,
slave_additional_security_group,
hive_metastore_url,
hive_metastore_user,
hive_metastore_password,
bucket_name,
target_date):
client = boto3.client('emr', region_name=region_name)
response = client.run_job_flow(
Name=name,
LogUri=log_uri,
ReleaseLabel=release_label,
Instances={
'InstanceGroups': [
{
'Name': 'emr-master',
'InstanceRole': 'MASTER',
'InstanceType': instance_type_master,
'InstanceCount': instance_count_master,
},
{
'Name': 'emr-core',
'InstanceRole': 'CORE',
'InstanceType': instance_type_slave,
'InstanceCount': instance_count_slave,
},
],
'Ec2KeyName': key_name,
'KeepJobFlowAliveWhenNoSteps': False,
'TerminationProtected': False,
'Ec2SubnetId': subnet_id,
'ServiceAccessSecurityGroup': service_access_security_group,
'EmrManagedMasterSecurityGroup': master_security_group,
'AdditionalMasterSecurityGroups': master_additional_security_group,
'EmrManagedSlaveSecurityGroup': slave_security_group,
'AdditionalSlaveSecurityGroups': slave_additional_security_group,
},
Steps=[],
Applications=[
{
'Name': 'Hadoop'
},
{
'Name': 'Hive'
},
],
Configurations=[
{
'Classification': 'hive-site',
'Properties': {
'javax.jdo.option.ConnectionURL': hive_metastore_url,
'javax.jdo.option.ConnectionUserName': hive_metastore_user,
'javax.jdo.option.ConnectionPassword': hive_metastore_password
}
},
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
)
client.add_job_flow_steps(
JobFlowId=response['JobFlowId'],
Steps=get_dist_cp_step(bucket_name, target_date),
)
client.add_job_flow_steps(
JobFlowId=response['JobFlowId'],
Steps=get_hive_script_step(bucket_name, target_date),
)
return response['JobFlowId']
def get_dist_cp_step(bucket_name, target_date):
return [{
'Name': 'S3DistCp step',
'ActionOnFailure': 'CANCEL_AND_WAIT',#'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE',
'HadoopJarStep': {
'Jar': '/usr/share/aws/emr/s3-dist-cp/lib/s3-dist-cp.jar',
'Args': [
'--src',
's3://{0}/path/to/raw/log/{1}/'.format(
bucket_name, target_date.strftime('%Y/%m/%d')),
'--dest',
's3://{0}/path/to/merge/log/{1}/'.format(
bucket_name, target_date.strftime('year=%Y/month=%m/day=%d')),
'--groupBy',
"(.)*",
'--targetSize',
'2048',
]
}
}]
def get_hive_script_step(bucket_name, target_date):
return [{
'Name': 'Hive script step',
'ActionOnFailure': 'CANCEL_AND_WAIT',#'TERMINATE_JOB_FLOW'|'TERMINATE_CLUSTER'|'CANCEL_AND_WAIT'|'CONTINUE',
'HadoopJarStep': {
'Jar': SCRIPT_RUNNER_JAR,
'Args': BASE_ARGS_HIVE_STEP + [
's3://{0}/path/to/hive-script/script.hql'.format(bucket_name),
'-d',
'YEAR={0}'.format(target_date.year),
'-d',
'MONTH={0:02d}'.format(target_date.month),
'-d',
'DAY={0:02d}'.format(target_date.day),
]
}
}]
def check_cluster_status(region_name, jobflow_id, created_after):
client = boto3.client('emr', region_name=region_name)
response = client.list_clusters(
CreatedAfter=created_after,
)
for cluster in response['Clusters']:
if cluster['Id'] == jobflow_id:
if(cluster['Status']['StateChangeReason'].has_key('Message')):
return (cluster['Status']['State'], cluster['Status']['StateChangeReason']['Message'])
else:
return (cluster['Status']['State'], None)
return None
if __name__ == '__main__':
import setting
import sys
import time
from datetime import datetime, date, timedelta
create_date = datetime.now() - timedelta(days=1)
jobflow_id = create_cluster(setting.REGION_NAME,
setting.CLUSTER_NAME,
setting.LOG_URI,
setting.RELEASE_LABEL,
setting.INSTANCE_TYPE_MASTER,
setting.INSTANCE_COUNT_MASTER,
setting.INSTANCE_TYPE_SLAVE,
setting.INSTANCE_COUNT_SLAVE,
setting.KEY_NAME,
setting.SUBNET_ID,
setting.SECURITY_GROUP_SERVICE_ACCESS,
setting.SECURITY_GROUP_MASTER,
setting.SECURITY_GROUP_MASTER_ADDITIONAL,
setting.SECURITY_GROUP_SLAVE,
setting.SECURITY_GROUP_SLAVE_ADDITIONAL,
setting.HIVE_METASTORE_URL,
setting.HIVE_METASTORE_USER,
setting.HIVE_METASTORE_PASSWORD,
setting.BUCKET_NAME,
date(2016, 2, 11))
while True:
status = check_cluster_status(setting.REGION_NAME,
jobflow_id,
create_date)
if status and status[0] == 'TERMINATED':
if status[1] == 'Steps completed':
print('complete')
break
else:
print('error')
sys.exit(1)
else:
time.sleep(60)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
REGION_NAME = 'ap-northeast-1'
CLUSTER_NAME = 'Cluster Name'
LOG_URI = 's3://path/to/emr/log/'
KEY_NAME = 'YOUR_KEY_NAME'
RELEASE_LABEL = 'emr-4.2.0'
INSTANCE_TYPE_MASTER = 'm1.medium'
INSTANCE_COUNT_MASTER = 1
INSTANCE_TYPE_SLAVE = 'm1.medium'
INSTANCE_COUNT_SLAVE = 1
SUBNET_ID = 'YOUR_SUBNET_ID'
SECURITY_GROUP_SERVICE_ACCESS = 'YOUR_SECURITY_GROUP'
SECURITY_GROUP_MASTER = 'YOUR_SECURITY_GROUP'
SECURITY_GROUP_MASTER_ADDITIONAL = ['YOUR_SECURITY_GROUP']
SECURITY_GROUP_SLAVE = 'YOUR_SECURITY_GROUP'
SECURITY_GROUP_SLAVE_ADDITIONAL = ['YOUR_SECURITY_GROUP']
HIVE_METASTORE_URL = 'jdbc:mysql://YOUR_RDS_URL/hive?createDatabaseIfNotExist=true'
HIVE_METASTORE_USER = 'YOUR_RDS_USER'
HIVE_METASTORE_PASSWORD = 'YOUR_RDS_PASSWORD'
BUCKET_NAME = 'YOUR_BUCKET_NAME'
@karsubr
Copy link

karsubr commented Oct 17, 2018

can you please clarify on the "import setting" statement? did you just import that package? How is setting able to return your aws account info ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment