Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created February 7, 2022 15:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/3e63d44e759c3d348ae0698355c91653 to your computer and use it in GitHub Desktop.
Save anna-geller/3e63d44e759c3d348ae0698355c91653 to your computer and use it in GitHub Desktop.
from typing import Tuple, Any
import prefect
from botocore.client import BaseClient
from prefect import Task, task, Flow
import awswrangler as wr
import boto3
from prefect.run_configs import UniversalRun
from prefect.storage import S3
from prefect import resource_manager
@task
def add_emr_step(cluster_id: str, steps: list) -> str:
emr_client = boto3.client("emr", region_name="us-west-2")
response = emr_client.add_job_flow_steps(JobFlowId=cluster_id, Steps=steps)
return response["StepIds"][0] # todo deal with array out of bounds
@task
def wait_emr_step(cluster_id: str, step_id: str):
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED":
pass
SPARK_STEPS_SUCCESS: list = [
{
"Name": "calculate_pi",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"],
},
}
]
SPARK_STEPS_ERROR: list = [
{
"Name": "calculate_pi_failure",
"ActionOnFailure": "CONTINUE",
"HadoopJarStep": {
"Jar": "command-runner.jar",
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "x"],
},
}
]
@resource_manager
class EMRCluster:
def __init__(self, region_name="us-west-2"):
self.region_name = region_name
def setup(self):
cluster_id = self.create_emr_cluster(cluster_name="prefect-example_cluster")
return cluster_id
def create_emr_cluster(self, cluster_name: str):
"""
:param cluster_name: name
:return: ID of the created cluster
See API https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow
"""
emr_client = boto3.client("emr", region_name=self.region_name)
response = emr_client.run_job_flow(
Name=cluster_name,
ReleaseLabel="emr-6.0.0",
LogUri="s3://carpe-spark-logs/",
Applications=[
{"Name": "Spark"},
{"Name": "Hadoop"},
{"Name": "Hive"},
{"Name": "Ganglia"},
],
Configurations=[
{
"Classification": "spark",
"Properties": {
"maxRemoteBlockSizeFetchToMem": "2gb",
"maximizeResourceAllocation": "true",
},
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.dynamicAllocation.enabled": "true",
"spark.local.dir": "/mnt",
},
},
{
"Classification": "spark-hive-site",
"Properties": {
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
},
},
],
Instances={
"InstanceGroups": [
{
"Name": "Master nodes",
"Market": "ON_DEMAND",
"InstanceRole": "MASTER",
"InstanceType": "m5.xlarge",
"InstanceCount": 1,
},
{
"Name": "Slave nodes",
"Market": "SPOT", # default max bid price is equal to ON_DEMAND
"InstanceRole": "CORE",
"InstanceType": "m5.xlarge",
"InstanceCount": 2,
},
],
"Ec2KeyName": "KeyCarpeDataDOPS",
"Ec2SubnetId": "subnet-c7fb0aa3",
"EmrManagedMasterSecurityGroup": "sg-50770d21",
"EmrManagedSlaveSecurityGroup": "sg-32710b43",
"KeepJobFlowAliveWhenNoSteps": True,
"TerminationProtected": False,
},
ScaleDownBehavior="TERMINATE_AT_TASK_COMPLETION",
StepConcurrencyLevel=1,
EbsRootVolumeSize=10,
BootstrapActions=[],
VisibleToAllUsers=True,
JobFlowRole="EMR_EC2_DefaultRole",
ServiceRole="EMR_DefaultRole",
Tags=[
{"Key": "Name", "Value": "devops_prefect"},
{"Key": "project", "Value": "devops_prefect"},
{"Key": "env_type", "Value": "non_prod"},
{"Key": "environment", "Value": "dev"},
{"Key": "dd-monitor", "Value": "ignore"},
{"Key": "DataStore", "Value": ""},
],
)
# todo check if the creation was successful, deal with array out of bounds
return response["JobFlowId"]
def terminate_cluster(self, cluster_id: str):
emr_client = boto3.client("emr", region_name=self.region_name)
response = emr_client.terminate_job_flows(JobFlowIds=[cluster_id])
return response
def cleanup(self, cluster_id):
self.terminate_cluster(cluster_id=cluster_id)
with Flow(
name="example_emr_jobs-success_error", run_config=UniversalRun(labels=["aws"])
) as flow:
with EMRCluster() as emr_cluster_id:
# success step
add_step_one_id = add_emr_step(
cluster_id=emr_cluster_id,
steps=SPARK_STEPS_SUCCESS,
task_args=dict(name="add_step_one"),
)
# wait_step_one = wait_emr_step(cluster_id=cluster_id, step_id=add_step_one)
wait_step_one = wait_emr_step(
cluster_id=emr_cluster_id,
step_id=add_step_one_id,
task_args=dict(name="wait_step_one"),
)
# failure step
add_step_two = add_emr_step(
cluster_id=emr_cluster_id,
steps=SPARK_STEPS_ERROR,
task_args=dict(name="add_step_two"),
)
wait_step_two = wait_emr_step(
cluster_id=add_step_two,
step_id=add_step_two,
task_args=dict(name="wait_step_two"),
)
# execution order
add_step_two.set_upstream(wait_step_one)
# flow.storage = S3(
# bucket="carpe-prefect-rivdata-dev",
# local_script_path="Example_emr_jobs-success_error.py",
# stored_as_script=True,
# )
# to run visualize, make sure you have graphviz installed on your mac: brew install graphviz
flow.visualize() # uncomment to view the graph locally
# flow.storage.build()
# flow.register(project_name="carpe_poc")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment