Skip to content

Instantly share code, notes, and snippets.

@cordon-thiago
Last active December 4, 2020 18:48
Show Gist options
  • Save cordon-thiago/4529fe53433062837ed183a27d5b1cb3 to your computer and use it in GitHub Desktop.
Save cordon-thiago/4529fe53433062837ed183a27d5b1cb3 to your computer and use it in GitHub Desktop.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.operators.spark_submit_operator import SparkSubmitOperator
from datetime import datetime, timedelta
###############################################
# Parameters
###############################################
spark_master = "spark://spark:7077"
spark_app_name = "Spark Hello World"
file_path = "/usr/local/spark/resources/data/airflow.cfg"
###############################################
# DAG Definition
###############################################
now = datetime.now()
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(now.year, now.month, now.day),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=1)
}
dag = DAG(
"spark-test",
default_args=default_args,
schedule_interval=timedelta(1)
)
start = DummyOperator(task_id="start", dag=dag)
spark_job = SparkSubmitOperator(
task_id="spark_job",
application="/usr/local/spark/app/hello-world.py", # Spark application path created in airflow and spark cluster
name=spark_app_name,
conn_id="spark_default",
verbose=1,
conf={"spark.master":spark_master},
application_args=[file_path],
dag=dag)
end = DummyOperator(task_id="end", dag=dag)
start >> spark_job >> end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment