Created
February 7, 2022 15:14
-
-
Save anna-geller/3e63d44e759c3d348ae0698355c91653 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Tuple, Any | |
import prefect | |
from botocore.client import BaseClient | |
from prefect import Task, task, Flow | |
import awswrangler as wr | |
import boto3 | |
from prefect.run_configs import UniversalRun | |
from prefect.storage import S3 | |
from prefect import resource_manager | |
@task | |
def add_emr_step(cluster_id: str, steps: list) -> str: | |
emr_client = boto3.client("emr", region_name="us-west-2") | |
response = emr_client.add_job_flow_steps(JobFlowId=cluster_id, Steps=steps) | |
return response["StepIds"][0] # todo deal with array out of bounds | |
@task | |
def wait_emr_step(cluster_id: str, step_id: str): | |
while wr.emr.get_step_state(cluster_id, step_id) != "COMPLETED": | |
pass | |
SPARK_STEPS_SUCCESS: list = [ | |
{ | |
"Name": "calculate_pi", | |
"ActionOnFailure": "CONTINUE", | |
"HadoopJarStep": { | |
"Jar": "command-runner.jar", | |
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "10"], | |
}, | |
} | |
] | |
SPARK_STEPS_ERROR: list = [ | |
{ | |
"Name": "calculate_pi_failure", | |
"ActionOnFailure": "CONTINUE", | |
"HadoopJarStep": { | |
"Jar": "command-runner.jar", | |
"Args": ["/usr/lib/spark/bin/run-example", "SparkPi", "x"], | |
}, | |
} | |
] | |
@resource_manager | |
class EMRCluster: | |
def __init__(self, region_name="us-west-2"): | |
self.region_name = region_name | |
def setup(self): | |
cluster_id = self.create_emr_cluster(cluster_name="prefect-example_cluster") | |
return cluster_id | |
def create_emr_cluster(self, cluster_name: str): | |
""" | |
:param cluster_name: name | |
:return: ID of the created cluster | |
See API https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.run_job_flow | |
""" | |
emr_client = boto3.client("emr", region_name=self.region_name) | |
response = emr_client.run_job_flow( | |
Name=cluster_name, | |
ReleaseLabel="emr-6.0.0", | |
LogUri="s3://carpe-spark-logs/", | |
Applications=[ | |
{"Name": "Spark"}, | |
{"Name": "Hadoop"}, | |
{"Name": "Hive"}, | |
{"Name": "Ganglia"}, | |
], | |
Configurations=[ | |
{ | |
"Classification": "spark", | |
"Properties": { | |
"maxRemoteBlockSizeFetchToMem": "2gb", | |
"maximizeResourceAllocation": "true", | |
}, | |
}, | |
{ | |
"Classification": "spark-defaults", | |
"Properties": { | |
"spark.dynamicAllocation.enabled": "true", | |
"spark.local.dir": "/mnt", | |
}, | |
}, | |
{ | |
"Classification": "spark-hive-site", | |
"Properties": { | |
"hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory" | |
}, | |
}, | |
], | |
Instances={ | |
"InstanceGroups": [ | |
{ | |
"Name": "Master nodes", | |
"Market": "ON_DEMAND", | |
"InstanceRole": "MASTER", | |
"InstanceType": "m5.xlarge", | |
"InstanceCount": 1, | |
}, | |
{ | |
"Name": "Slave nodes", | |
"Market": "SPOT", # default max bid price is equal to ON_DEMAND | |
"InstanceRole": "CORE", | |
"InstanceType": "m5.xlarge", | |
"InstanceCount": 2, | |
}, | |
], | |
"Ec2KeyName": "KeyCarpeDataDOPS", | |
"Ec2SubnetId": "subnet-c7fb0aa3", | |
"EmrManagedMasterSecurityGroup": "sg-50770d21", | |
"EmrManagedSlaveSecurityGroup": "sg-32710b43", | |
"KeepJobFlowAliveWhenNoSteps": True, | |
"TerminationProtected": False, | |
}, | |
ScaleDownBehavior="TERMINATE_AT_TASK_COMPLETION", | |
StepConcurrencyLevel=1, | |
EbsRootVolumeSize=10, | |
BootstrapActions=[], | |
VisibleToAllUsers=True, | |
JobFlowRole="EMR_EC2_DefaultRole", | |
ServiceRole="EMR_DefaultRole", | |
Tags=[ | |
{"Key": "Name", "Value": "devops_prefect"}, | |
{"Key": "project", "Value": "devops_prefect"}, | |
{"Key": "env_type", "Value": "non_prod"}, | |
{"Key": "environment", "Value": "dev"}, | |
{"Key": "dd-monitor", "Value": "ignore"}, | |
{"Key": "DataStore", "Value": ""}, | |
], | |
) | |
# todo check if the creation was successful, deal with array out of bounds | |
return response["JobFlowId"] | |
def terminate_cluster(self, cluster_id: str): | |
emr_client = boto3.client("emr", region_name=self.region_name) | |
response = emr_client.terminate_job_flows(JobFlowIds=[cluster_id]) | |
return response | |
def cleanup(self, cluster_id): | |
self.terminate_cluster(cluster_id=cluster_id) | |
with Flow( | |
name="example_emr_jobs-success_error", run_config=UniversalRun(labels=["aws"]) | |
) as flow: | |
with EMRCluster() as emr_cluster_id: | |
# success step | |
add_step_one_id = add_emr_step( | |
cluster_id=emr_cluster_id, | |
steps=SPARK_STEPS_SUCCESS, | |
task_args=dict(name="add_step_one"), | |
) | |
# wait_step_one = wait_emr_step(cluster_id=cluster_id, step_id=add_step_one) | |
wait_step_one = wait_emr_step( | |
cluster_id=emr_cluster_id, | |
step_id=add_step_one_id, | |
task_args=dict(name="wait_step_one"), | |
) | |
# failure step | |
add_step_two = add_emr_step( | |
cluster_id=emr_cluster_id, | |
steps=SPARK_STEPS_ERROR, | |
task_args=dict(name="add_step_two"), | |
) | |
wait_step_two = wait_emr_step( | |
cluster_id=add_step_two, | |
step_id=add_step_two, | |
task_args=dict(name="wait_step_two"), | |
) | |
# execution order | |
add_step_two.set_upstream(wait_step_one) | |
# flow.storage = S3( | |
# bucket="carpe-prefect-rivdata-dev", | |
# local_script_path="Example_emr_jobs-success_error.py", | |
# stored_as_script=True, | |
# ) | |
# to run visualize, make sure you have graphviz installed on your mac: brew install graphviz | |
flow.visualize() # uncomment to view the graph locally | |
# flow.storage.build() | |
# flow.register(project_name="carpe_poc") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment