Skip to content

Instantly share code, notes, and snippets.

@sprzedwojski
Created April 25, 2019 16:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sprzedwojski/5289f51115dd9d6ed52067d6f2dcd9d0 to your computer and use it in GitHub Desktop.
Save sprzedwojski/5289f51115dd9d6ed52067d6f2dcd9d0 to your computer and use it in GitHub Desktop.
from airflow.utils import dates
from airflow.contrib.operators import dataproc_operator
from airflow import models
from airflow.utils.trigger_rule import TriggerRule
from o2a_libs.el_basic_functions import *
from airflow.operators import bash_operator
import datetime
from o2a_libs.el_wf_functions import *
from airflow.operators import dummy_operator
PARAMS = {
"user.name": "szymon",
"nameNode": "hdfs://",
"resourceManager": "localhost:8032",
"queueName": "default",
"examplesRoot": "examples",
"oozie.wf.application.path": "hdfs:///user/mapreduce/examples/mapreduce",
"outputDir": "output",
"dataproc_cluster": "oozie-o2a-2cpu",
"gcp_conn_id": "google_cloud_default",
"gcp_region": "europe-west3",
"hadoop_jars": "hdfs:/user/mapreduce/examples/mapreduce/lib/wordcount.jar",
"hadoop_main_class": "WordCount",
}
def sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
with models.DAG(
"{0}.{1}".format(parent_dag_name, child_dag_name),
schedule_interval=schedule_interval, # Change to suit your needs
start_date=start_date, # Change to suit your needs
) as dag:
mr_node_prepare = bash_operator.BashOperator(
task_id="mr_node_prepare",
bash_command='$DAGS_FOLDER/../data/prepare.sh -c oozie-o2a-2cpu -r europe-west3 -d "/user/mapreduce/examples/mapreduce/output"',
)
mr_node = dataproc_operator.DataProcHadoopOperator(
main_class=PARAMS["hadoop_main_class"],
arguments=[
"/user/mapreduce/examples/mapreduce/input",
"/user/mapreduce/examples/mapreduce/output",
],
files=["hdfs:///user/mapreduce/examples/mapreduce/lib/wordcount.jar"],
cluster_name=PARAMS["dataproc_cluster"],
task_id="mr_node",
trigger_rule="all_success",
dataproc_hadoop_properties={
"mapred.job.queue.name": "default",
"mapreduce.mapper.class": "WordCount$Map",
"mapreduce.reducer.class": "WordCount$Reduce",
"mapreduce.combine.class": "WordCount$Reduce",
"mapreduce.job.output.key.class": "org.apache.hadoop.io.Text",
"mapreduce.job.output.value.class": "org.apache.hadoop.io.IntWritable",
"mapred.input.dir": "/user/mapreduce/examples/mapreduce/input",
"mapred.output.dir": "/user/mapreduce/examples/mapreduce/output",
},
dataproc_hadoop_jars=PARAMS["hadoop_jars"],
gcp_conn_id=PARAMS["gcp_conn_id"],
region=PARAMS["gcp_region"],
dataproc_job_id="mr_node",
)
mr_node_prepare.set_downstream(mr_node)
return dag
from airflow.utils import dates
from airflow.contrib.operators import dataproc_operator
from airflow import models
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators.subdag_operator import SubDagOperator
from o2a_libs.el_basic_functions import *
from airflow.operators import bash_operator
import datetime
from subdag_test import sub_dag
from o2a_libs.el_wf_functions import *
from o2a_libs.el_basic_functions import first_not_null
from airflow.operators import dummy_operator
import shlex
from airflow.operators import python_operator
PARAMS = {
"user.name": "demo",
"nameNode": "hdfs://",
"resourceManager": "localhost:8032",
"queueName": "default",
"examplesRoot": "examples",
"streamingMapper": "/bin/cat",
"streamingReducer": "/usr/bin/wc",
"oozie.use.system.libpath": "true",
"oozie.wf.application.path": "hdfs:///user/demo/examples/apps/demo",
"dataproc_cluster": "oozie-o2a-2cpu",
"gcp_conn_id": "google_cloud_default",
"gcp_region": "europe-west3",
"gcp_uri_prefix": "gs://europe-west1-o2a-integratio-f690ede2-bucket/dags",
}
with models.DAG(
"test_demo_dag",
schedule_interval=None, # Change to suit your needs
start_date=dates.days_ago(0), # Change to suit your needs
) as dag:
cleanup_node_fs_0_mkdir = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -mkdir -p /user/fs/examples/demo-cleanup"),
),
task_id="cleanup_node_fs_0_mkdir",
)
cleanup_node_fs_1_delete = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote("fs -rm -r /user/fs/examples/demo-cleanup"),
),
task_id="cleanup_node_fs_1_delete",
)
cleanup_node_fs_0_mkdir.set_downstream(cleanup_node_fs_1_delete)
fork_node = dummy_operator.DummyOperator(task_id="fork_node", trigger_rule="all_success")
pig_node_prepare = bash_operator.BashOperator(
task_id="pig_node_prepare",
bash_command='$DAGS_FOLDER/../data/prepare.sh -c oozie-o2a-2cpu -r europe-west3 -d "/user/pig/examples/test_pig_node/output-data" -m "/user/pig/examples/test_pig_node/created-folder"',
)
pig_node = dataproc_operator.DataProcPigOperator(
query_uri="{}/{}".format(PARAMS["gcp_uri_prefix"], "id.pig"),
task_id="pig_node",
trigger_rule="all_success",
variables={
"INPUT": "/user/pig/examples/test_pig_node/input-data/test-data.txt",
"OUTPUT": "/user/pig/examples/test_pig_node/output-data",
},
dataproc_pig_properties={"mapred.job.queue.name": "default", "mapred.map.output.compress": "false"},
cluster_name=PARAMS["dataproc_cluster"],
gcp_conn_id=PARAMS["gcp_conn_id"],
region=PARAMS["gcp_region"],
dataproc_job_id="pig_node",
)
pig_node_prepare.set_downstream(pig_node)
subworkflow_node = SubDagOperator(
subdag=sub_dag(dag.dag_id, "subworkflow_node", dag.start_date, dag.schedule_interval),
task_id="subworkflow_node",
)
shell_node_prepare = bash_operator.BashOperator(
task_id="shell_node_prepare",
bash_command='$DAGS_FOLDER/../data/prepare.sh -c oozie-o2a-2cpu -r europe-west3 -d "/examples/output-data/demo/pig-node" -m "/examples/input-data/demo/pig-node"',
)
shell_node = bash_operator.BashOperator(
task_id="shell_node",
bash_command="gcloud dataproc jobs submit pig --cluster={0} --region={1} --execute 'sh java -version'".format(
PARAMS["dataproc_cluster"], PARAMS["gcp_region"]
),
)
shell_node_prepare.set_downstream(shell_node)
join_node = dummy_operator.DummyOperator(task_id="join_node", trigger_rule="all_success")
def decision_node_decision():
if "True":
return "hdfs_node"
else:
return "end"
decision_node = python_operator.BranchPythonOperator(
python_callable=decision_node_decision, task_id="decision_node", trigger_rule="all_success"
)
hdfs_node = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(
dataproc_cluster=PARAMS["dataproc_cluster"],
gcp_region=PARAMS["gcp_region"],
bash_command=shlex.quote(
"fs -mv /user/pig/examples/test_pig_node/output-data /user/pig/examples/test_pig_node/output-data-2"
),
),
task_id="hdfs_node",
)
end = dummy_operator.DummyOperator(task_id="end", trigger_rule="all_success")
fork_node.set_downstream(pig_node_prepare)
join_node.set_downstream(decision_node)
shell_node.set_downstream(join_node)
decision_node.set_downstream(hdfs_node)
cleanup_node_fs_1_delete.set_downstream(fork_node)
fork_node.set_downstream(shell_node_prepare)
subworkflow_node.set_downstream(join_node)
decision_node.set_downstream(end)
pig_node.set_downstream(join_node)
hdfs_node.set_downstream(end)
fork_node.set_downstream(subworkflow_node)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment