Skip to content

Instantly share code, notes, and snippets.

@jster1357
Last active November 2, 2022 15:16
Show Gist options
  • Save jster1357/9dcbf8346d170c3423bec036555ee009 to your computer and use it in GitHub Desktop.
Save jster1357/9dcbf8346d170c3423bec036555ee009 to your computer and use it in GitHub Desktop.
getCDFMetadataDAGComposer
import datetime
import re
from airflow import models
from airflow.providers.google.cloud.operators.datafusion import CloudDataFusionStartPipelineOperator
from airflow.providers.google.cloud.sensors.datafusion import CloudDataFusionPipelineStateSensor
from airflow.providers.google.cloud.hooks.datafusion import PipelineStates
from airflow.utils.dates import days_ago
PROJECT_ID=""
REGION=""
INSTANCE_NAME=""
PIPELINE_NAME0=""
PIPELINE_NAME1=""
NAMESPACE=""
CDF_ENDPOINT=""
def set_runtime_params():
args1 = {}
args1['pipeline_name'] = PIPELINE_NAME0
args1['namespace'] = NAMESPACE
args1['endpoint'] = CDF_ENDPOINT
args1['runid'] = pipeline0.output
return args1
with models.DAG(
"call_jobs", # Dag id
schedule_interval=None,
start_date=days_ago(1)
) as dag:
pipeline0 = CloudDataFusionStartPipelineOperator(
location=REGION,
pipeline_name=PIPELINE_NAME0,
instance_name=INSTANCE_NAME,
runtime_args=args0,
success_states=[PipelineStates.COMPLETED],
pipeline_timeout=3600,
namespace=NAMESPACE,
task_id="start_pipeline",
)
pipeline1 = CloudDataFusionStartPipelineOperator(
location=REGION,
pipeline_name=PIPELINE_NAME1,
instance_name=INSTANCE_NAME,
runtime_args=set_runtime_params(),
success_states=[PipelineStates.COMPLETED],
pipeline_timeout=3600,
namespace=NAMESPACE,
task_id="get_job_metadata",
)
pipeline0 >> pipeline1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment