Created
April 30, 2021 12:56
-
-
Save afonsoaugusto/c65779588d98a8b151c038a2d320c671 to your computer and use it in GitHub Desktop.
Airflow + Spark + k8s
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
apiVersion: "sparkoperator.k8s.io/v1beta2" | |
kind: SparkApplication | |
metadata: | |
name: pyspark-pi | |
namespace: spark-job | |
spec: | |
type: Python | |
pythonVersion: "3" | |
mode: cluster | |
image: "gcr.io/spark-operator/spark-py:v3.0.0" | |
imagePullPolicy: Always | |
mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py | |
sparkVersion: "3.0.0" | |
restartPolicy: | |
type: OnFailure | |
onFailureRetries: 3 | |
onFailureRetryInterval: 10 | |
onSubmissionFailureRetries: 5 | |
onSubmissionFailureRetryInterval: 20 | |
driver: | |
cores: 1 | |
coreLimit: "1200m" | |
memory: "512m" | |
labels: | |
version: 3.0.0 | |
serviceAccount: spark-spark | |
executor: | |
cores: 1 | |
instances: 3 | |
memory: "512m" | |
labels: | |
version: 3.0.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import timedelta | |
# [START import_module] | |
# The DAG object; we'll need this to instantiate a DAG | |
from airflow import DAG | |
# Operators; we need this to operate! | |
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator | |
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor | |
from airflow.utils.dates import days_ago | |
# [END import_module] | |
# [START default_args] | |
default_args = { | |
'owner': 'Afonso Rodrigues', | |
'depends_on_past': False, | |
'email': ['afonsoaugustoventura@gmail.com'], | |
'email_on_failure': True, | |
'email_on_retry': False, | |
'retries': 3, | |
'retry_delay': timedelta(minutes=5)} | |
# [END default_args] | |
# [START instantiate_dag] | |
dag = DAG( | |
'spark_pi', | |
default_args=default_args, | |
description='submit spark-pi as sparkApplication on kubernetes', | |
schedule_interval=timedelta(days=1), | |
start_date=days_ago(1), | |
) | |
t1 = SparkKubernetesOperator( | |
task_id='spark_pi_submit', | |
namespace="spark-job", | |
application_file="spark-pi.yml", | |
kubernetes_conn_id="kubernetes_default", | |
do_xcom_push=True, | |
dag=dag, | |
) | |
t2 = SparkKubernetesSensor( | |
task_id='spark_pi_monitor', | |
namespace="spark-job", | |
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}", | |
kubernetes_conn_id="kubernetes_default", | |
dag=dag, | |
) | |
t1 >> t2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment