Skip to content

Instantly share code, notes, and snippets.

@Mageswaran1989
Last active April 2, 2020 19:31
Show Gist options
  • Save Mageswaran1989/63e775bb3ff95bc37d28d0484ab9b662 to your computer and use it in GitHub Desktop.
Save Mageswaran1989/63e775bb3ff95bc37d28d0484ab9b662 to your computer and use it in GitHub Desktop.
from airflow.models import BaseOperator
class EMRMonitorOperator(BaseOperator):
@apply_defaults
def __init__(self, job_flow_id, job_flow_name, *args, **kwargs):
super(EMRMonitorOperator, self).__init__(*args, **kwargs)
self._emr = EMRMonitor(job_flow_id=job_flow_id, job_flow_name=job_flow_name)
self._job_flow_id = job_flow_id
self._job_flow_name = job_flow_name
def execute(self, context):
is_scale_down = False
#TODO use Airflow Variables to monitor only a subset of clusters
cluster_ids, cluster_names = self._emr.get_active_clusters_id_n_names()
print(cluster_ids)
print(cluster_names)
logging.info(cluster_ids)
logging.info(cluster_names)
msgs = []
for cluster_id, cluster_name in zip(cluster_ids, cluster_names):
self._emr.set_job_flow_id(job_flow_id=cluster_id)
self._emr.set_job_flow_name(job_flow_name=cluster_name)
res, num_core_nodes, num_tasks_nodes = self._emr.check_and_scale_down()
is_scale_down = is_scale_down or res
if res: # Add current cluster details if it is flagged as scale down
msg = "\nCluster {} is running {} Core node(s) and {} Tasks node(s).".format(
cluster_name, num_core_nodes, num_tasks_nodes)
msgs.append(msg)
mail_id = self._emr.get_email_id()
notify_email(cluster_name=cluster_name,
num_core_nodes=num_core_nodes,
num_tasks_nodes=num_tasks_nodes,
mail_id=mail_id)
if is_scale_down:
emr_slack_alert(context=context, msgs=msgs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment