Skip to content

Instantly share code, notes, and snippets.

@pascaldelange
Last active March 25, 2021 13:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pascaldelange/4d0859106b7e943872a78bfeff575a2a to your computer and use it in GitHub Desktop.
Save pascaldelange/4d0859106b7e943872a78bfeff575a2a to your computer and use it in GitHub Desktop.
from datetime import datetime
from airflow import DAG
from airflow import models
from airflow.contrib.operators.dataflow_operator import DataflowTemplateOperator
from dataflow.utils import generate_default_args
from config.services import define_slack_callback
JOB_NAME = "shiny-dataflow-job"
default_args = generate_default_args(job_name=JOB_NAME)
# Main Object
dag = DAG(
# The id you will see in the DAG airflow page
"dataflow_{}".format(JOB_NAME.replace("-", "_")),
default_args=default_args,
schedule_interval="0 14 * * *", # Every day at 14:00
tags=["dataflow"],
start_date=datetime(2021, 1, 4),
catchup=False,
# callback function that will post a slack message on failure
on_failure_callback=define_slack_callback(job_name=JOB_NAME, is_dataflow=True),
)
start_template_job_dag = DataflowTemplateOperator(
task_id="dataflow_job",
template="gs://{BUCKET}/templates/{JOB_NAME}".format(
BUCKET=models.Variable.get("DATAFLOW_BUCKET"), JOB_NAME=JOB_NAME
),
job_name=JOB_NAME,
dag=dag,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment