Skip to content

Instantly share code, notes, and snippets.

@diogoaurelio
Last active November 23, 2017 13:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save diogoaurelio/ab22d6e57a100768ab9908e143599b73 to your computer and use it in GitHub Desktop.
Save diogoaurelio/ab22d6e57a100768ab9908e143599b73 to your computer and use it in GitHub Desktop.
emr_boto3_example.py
applications = [
{'Name': 'Spark'},
{'Name': 'Hive'},
{'Name': 'Tez'},
{'Name': 'Hadoop'},
{'Name': 'Ganglia'},
{'Name': 'Presto'},
{'Name': 'Zeppelin'}
]
release_label = "emr-5.9.0"
log_uri = "s3n://SOME-LOGS-BUCKET/"
configurations = [
{
"Classification": "spark-env",
"Properties": {},
"Configurations":
[
{
"Classification": "export",
"Properties":
{
"PYSPARK_PYTHON": "/mnt/pyspark/miniconda/envs/spark/bin/python3.6",
"JAVA_HOME": "/usr/lib/jvm/java-1.8.0",
"PYSPARK_DRIVER_PYTHON": "/mnt/pyspark/miniconda/envs/spark/bin/python3.6"
},
"Configurations": []
}
]
},
{
"Classification": "hadoop-env",
"Properties": {},
"Configurations":
[
{
"Classification": "export",
"Properties":
{
"JAVA_HOME": "/usr/lib/jvm/java-1.8.0"
},
"Configurations": []
}
]
},
{
"Classification": "hive-site",
"Properties":
{
"javax.jdo.option.ConnectionUserName": "SOME-USER",
"javax.jdo.option.ConnectionDriverName": "org.mariadb.jdbc.Driver",
"javax.jdo.option.ConnectionPassword": "SOME-PASSWORD",
"javax.jdo.option.ConnectionURL": "jdbc:mysql://HOST:3306/SOME-HIVEDB?createDatabaseIfNotExist=true"
},
"Configurations": []
},
{
"Classification": "presto-connector-hive",
"Properties":
{
"hive.parquet-optimized-reader.enabled": "true",
"hive.parquet-predicate-pushdown.enabled": "true",
"hive.parquet.use-column-names": "true",
"hive.orc.use-column-names": "true"
}
},
{
"Classification": "hue-ini",
"Properties": {},
"Configurations":
[
{
"Classification": "desktop",
"Properties": {},
"Configurations":
[
{
"Classification": "database",
"Properties":
{
"password": "SOME-PWD",
"engine": "mysql",
"port": "3306",
"host": "HUE-HOST",
"name": "HUE-DB",
"user": "HUE-USER"
},
"Configurations": []
}
]
}
]
}
]
steps = [
{
"ActionOnFailure": "TERMINATE_JOB_FLOW",
"Name": "Test S3FS script",
"HadoopJarStep":
{
"Jar": "command-runner.jar",
"Args": [
"echo",
"1"
]
}
}
]
# instance group configuration
instance_groups = [
{
"InstanceCount": core_instance_count,
"InstanceRole": "CORE",
"InstanceType": core_instance_type,
"Name": "CORE"
},
{
"InstanceCount": task_instance_count,
"BidPrice": task_spot_bid_price,
"InstanceRole": "TASK",
"InstanceType": task_instance_type,
"Market": "SPOT",
"Name": "TASK"
},
{
"InstanceCount": 1
, "EbsConfiguration":
{
"EbsBlockDeviceConfigs": [
{
"VolumeSpecification":
{
"SizeInGB": 32,
"VolumeType": "gp2"
},
"VolumesPerInstance": 1
}
]
},
"InstanceRole": "MASTER",
"InstanceType": master_instance_type,
"Name": "MASTER"
}
]
instances = {
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False,
"Ec2KeyName": "SOME-SSH-KEY",
"Ec2SubnetId": "SOME-SUBNET-ID",
"EmrManagedSlaveSecurityGroup": "SOME-SG-ID",
"EmrManagedMasterSecurityGroup": "SOME-SG-ID",
"ServiceAccessSecurityGroup": "SOME-SERVICE-SG-ID",
"InstanceGroups": instance_groups
}
job_flow_role = "SOME-INSTANCE-ROLE"
service_role = "SOME-SERVICE-ROLE"
tags = [
{
'Key': 'Name',
'Value': global_cluster_name
},
{
'Key': 'Team',
'Value': 'YOUR-TEAM'
},
{
'Key': 'Managed',
'Value': 'BY-SOME-SCHEDULER'
},
{
'Key': 'Environment',
'Value': 'dev'
},
]
bootstrap_actions = [{
"Name": "presto_configuration",
"ScriptBootstrapAction": {
"Path": "s3://{0}/{1}".format(s3_bucket_presto, s3_key_presto)
}
}
]
response = client.run_job_flow(Applications=applications,
Name=global_cluster_name,
BootstrapActions=bootstrap_actions,
ReleaseLabel=release_label,
LogUri=log_uri,
Steps=steps,
Tags=tags,
Configurations=configurations,
Instances=instances,
JobFlowRole=job_flow_role,
VisibleToAllUsers=True,
ServiceRole=service_role)
print("got response {0}".format(response))
job_flow_id = response['JobFlowId']
logging.info("Got job_flow_id {0}".format(job_flow_id))
cluster_id = job_flow_id
time.sleep(300)
cluster_description = client.describe_cluster(
ClusterId=cluster_id
)
print(cluster_description)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment