Skip to content

Instantly share code, notes, and snippets.

@al102964
Created April 13, 2020 01:02
Show Gist options
  • Save al102964/a8cd6cfba4756cc8b5bc9ac88b69d0de to your computer and use it in GitHub Desktop.
Save al102964/a8cd6cfba4756cc8b5bc9ac88b69d0de to your computer and use it in GitHub Desktop.
import airflowlib.emr_lib as emr
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 11, 01),
'retries': 0,
'retry_delay': timedelta(minutes=2),
'provide_context': True
}
# Initialize the DAG
# Concurrency --> Number of tasks allowed to run concurrently
dag = DAG('transform_movielens', concurrency=3, schedule_interval=None, default_args=default_args)
region = emr.get_region()
emr.client(region_name=region)
# Creates an EMR cluster
def create_emr(**kwargs):
cluster_id = emr.create_cluster(region_name=region, cluster_name='movielens_cluster', num_core_nodes=2)
return cluster_id
# Waits for the EMR cluster to be ready to accept jobs
def wait_for_completion(**kwargs):
ti = kwargs['ti']
cluster_id = ti.xcom_pull(task_ids='create_cluster')
emr.wait_for_cluster_creation(cluster_id)
# Terminates the EMR cluster
def terminate_emr(**kwargs):
ti = kwargs['ti']
cluster_id = ti.xcom_pull(task_ids='create_cluster')
emr.terminate_cluster(cluster_id)
# Converts each of the movielens datafile to parquet
def transform_movies_to_parquet(**kwargs):
# ti is the Task Instance
ti = kwargs['ti']
cluster_id = ti.xcom_pull(task_ids='create_cluster')
cluster_dns = emr.get_cluster_dns(cluster_id)
headers = emr.create_spark_session(cluster_dns, 'spark')
session_url = emr.wait_for_idle_session(cluster_dns, headers)
statement_response = emr.submit_statement(session_url,
'/root/airflow/dags/transform/movies.scala')
emr.track_statement_progress(cluster_dns, statement_response.headers)
emr.kill_spark_session(session_url)
def transform_tags_to_parquet(**kwargs):
# ti is the Task Instance
ti = kwargs['ti']
cluster_id = ti.xcom_pull(task_ids='create_cluster')
cluster_dns = emr.get_cluster_dns(cluster_id)
headers = emr.create_spark_session(cluster_dns, 'spark')
session_url = emr.wait_for_idle_session(cluster_dns, headers)
statement_response = emr.submit_statement(session_url,
'/root/airflow/dags/transform/tags.scala')
emr.track_statement_progress(cluster_dns, statement_response.headers)
emr.kill_spark_session(session_url)
def transform_ratings_to_parquet(**kwargs):
# ti is the Task Instance
ti = kwargs['ti']
cluster_id = ti.xcom_pull(task_ids='create_cluster')
cluster_dns = emr.get_cluster_dns(cluster_id)
headers = emr.create_spark_session(cluster_dns, 'spark')
session_url = emr.wait_for_idle_session(cluster_dns, headers)
statement_response = emr.submit_statement(session_url,
'/root/airflow/dags/transform/ratings.scala')
emr.track_statement_progress(cluster_dns, statement_response.headers)
emr.kill_spark_session(session_url)
def transform_links_to_parquet(**kwargs):
# ti is the Task Instance
ti = kwargs['ti']
cluster_id = ti.xcom_pull(task_ids='create_cluster')
cluster_dns = emr.get_cluster_dns(cluster_id)
headers = emr.create_spark_session(cluster_dns, 'spark')
session_url = emr.wait_for_idle_session(cluster_dns, headers)
statement_response = emr.submit_statement(session_url,
'/root/airflow/dags/transform/links.scala')
emr.track_statement_progress(cluster_dns, statement_response.headers)
emr.kill_spark_session(session_url)
def transform_genome_scores_to_parquet(**kwargs):
# ti is the Task Instance
ti = kwargs['ti']
cluster_id = ti.xcom_pull(task_ids='create_cluster')
cluster_dns = emr.get_cluster_dns(cluster_id)
headers = emr.create_spark_session(cluster_dns, 'spark')
session_url = emr.wait_for_idle_session(cluster_dns, headers)
statement_response = emr.submit_statement(session_url,
'/root/airflow/dags/transform/genome_scores.scala')
emr.track_statement_progress(cluster_dns, statement_response.headers)
emr.kill_spark_session(session_url)
def transform_genome_tags_to_parquet(**kwargs):
# ti is the Task Instance
ti = kwargs['ti']
cluster_id = ti.xcom_pull(task_ids='create_cluster')
cluster_dns = emr.get_cluster_dns(cluster_id)
headers = emr.create_spark_session(cluster_dns, 'spark')
session_url = emr.wait_for_idle_session(cluster_dns, headers)
statement_response = emr.submit_statement(session_url,
'/root/airflow/dags/transform/genome_tags.scala')
emr.track_statement_progress(cluster_dns, statement_response.headers)
emr.kill_spark_session(session_url)
def create_joins(**kwargs):
# ti is the Task Instance
ti = kwargs['ti']
cluster_id = ti.xcom_pull(task_ids='create_cluster')
cluster_dns = emr.get_cluster_dns(cluster_id)
headers = emr.create_spark_session(cluster_dns, 'pyspark')
session_url = emr.wait_for_idle_session(cluster_dns, headers)
statement_response = emr.submit_statement(session_url,
'/root/airflow/dags/transform/joins.py')
emr.track_statement_progress(cluster_dns, statement_response.headers)
emr.kill_spark_session(session_url)
def create_linear_regression(**kwargs):
# ti is the Task Instance
ti = kwargs['ti']
cluster_id = ti.xcom_pull(task_ids='create_cluster')
cluster_dns = emr.get_cluster_dns(cluster_id)
headers = emr.create_spark_session(cluster_dns, 'pyspark')
session_url = emr.wait_for_idle_session(cluster_dns, headers)
statement_response = emr.submit_statement(session_url,
'/root/airflow/dags/transform/linear_regression.py')
emr.track_statement_progress(cluster_dns, statement_response.headers)
emr.kill_spark_session(session_url)
# Define the individual tasks using Python Operators
create_cluster = PythonOperator(
task_id='create_cluster',
python_callable=create_emr,
dag=dag)
wait_for_cluster_completion = PythonOperator(
task_id='wait_for_cluster_completion',
python_callable=wait_for_completion,
dag=dag)
transform_movies = PythonOperator(
task_id='transform_movies',
python_callable=transform_movies_to_parquet,
dag=dag)
transform_ratings = PythonOperator(
task_id='transform_ratings',
python_callable=transform_ratings_to_parquet,
dag=dag)
transform_tags = PythonOperator(
task_id='transform_tags',
python_callable=transform_tags_to_parquet,
dag=dag)
transform_links = PythonOperator(
task_id='transform_links',
python_callable=transform_links_to_parquet,
dag=dag)
transform_genome_scores = PythonOperator(
task_id='transform_genome_scores',
python_callable=transform_genome_scores_to_parquet,
dag=dag)
transform_genome_tags = PythonOperator(
task_id='transform_genome_tags',
python_callable=transform_genome_tags_to_parquet,
dag=dag)
create_joins = PythonOperator(
task_id='create_joins',
python_callable=create_joins,
dag=dag)
create_machine_learning = PythonOperator(
task_id='create_linear_regression',
python_callable=create_linear_regression,
dag=dag)
terminate_cluster = PythonOperator(
task_id='terminate_cluster',
python_callable=terminate_emr,
trigger_rule='all_done',
dag=dag)
# construct the DAG by setting the dependencies
create_cluster >> wait_for_cluster_completion
wait_for_cluster_completion >> transform_movies >> create_joins
wait_for_cluster_completion >> transform_ratings >> create_joins
wait_for_cluster_completion >> transform_links >> create_joins
wait_for_cluster_completion >> transform_tags >> create_joins
wait_for_cluster_completion >> transform_genome_scores >> create_joins
wait_for_cluster_completion >> transform_genome_tags >> create_joins
create_joins >> create_machine_learning >> terminate_cluster
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment