Skip to content

Instantly share code, notes, and snippets.

@afonsoaugusto
Created April 30, 2021 12:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afonsoaugusto/c65779588d98a8b151c038a2d320c671 to your computer and use it in GitHub Desktop.
Save afonsoaugusto/c65779588d98a8b151c038a2d320c671 to your computer and use it in GitHub Desktop.
Airflow + Spark + k8s
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: pyspark-pi
namespace: spark-job
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "gcr.io/spark-operator/spark-py:v3.0.0"
imagePullPolicy: Always
mainApplicationFile: local:///opt/spark/examples/src/main/python/pi.py
sparkVersion: "3.0.0"
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 10
onSubmissionFailureRetries: 5
onSubmissionFailureRetryInterval: 20
driver:
cores: 1
coreLimit: "1200m"
memory: "512m"
labels:
version: 3.0.0
serviceAccount: spark-spark
executor:
cores: 1
instances: 3
memory: "512m"
labels:
version: 3.0.0
from datetime import timedelta
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
default_args = {
'owner': 'Afonso Rodrigues',
'depends_on_past': False,
'email': ['afonsoaugustoventura@gmail.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5)}
# [END default_args]
# [START instantiate_dag]
dag = DAG(
'spark_pi',
default_args=default_args,
description='submit spark-pi as sparkApplication on kubernetes',
schedule_interval=timedelta(days=1),
start_date=days_ago(1),
)
t1 = SparkKubernetesOperator(
task_id='spark_pi_submit',
namespace="spark-job",
application_file="spark-pi.yml",
kubernetes_conn_id="kubernetes_default",
do_xcom_push=True,
dag=dag,
)
t2 = SparkKubernetesSensor(
task_id='spark_pi_monitor',
namespace="spark-job",
application_name="{{ task_instance.xcom_pull(task_ids='spark_pi_submit')['metadata']['name'] }}",
kubernetes_conn_id="kubernetes_default",
dag=dag,
)
t1 >> t2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment