Last active
November 2, 2022 15:16
-
-
Save jster1357/9dcbf8346d170c3423bec036555ee009 to your computer and use it in GitHub Desktop.
getCDFMetadataDAGComposer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import re | |
from airflow import models | |
from airflow.providers.google.cloud.operators.datafusion import CloudDataFusionStartPipelineOperator | |
from airflow.providers.google.cloud.sensors.datafusion import CloudDataFusionPipelineStateSensor | |
from airflow.providers.google.cloud.hooks.datafusion import PipelineStates | |
from airflow.utils.dates import days_ago | |
PROJECT_ID="" | |
REGION="" | |
INSTANCE_NAME="" | |
PIPELINE_NAME0="" | |
PIPELINE_NAME1="" | |
NAMESPACE="" | |
CDF_ENDPOINT="" | |
def set_runtime_params(): | |
args1 = {} | |
args1['pipeline_name'] = PIPELINE_NAME0 | |
args1['namespace'] = NAMESPACE | |
args1['endpoint'] = CDF_ENDPOINT | |
args1['runid'] = pipeline0.output | |
return args1 | |
with models.DAG( | |
"call_jobs", # Dag id | |
schedule_interval=None, | |
start_date=days_ago(1) | |
) as dag: | |
pipeline0 = CloudDataFusionStartPipelineOperator( | |
location=REGION, | |
pipeline_name=PIPELINE_NAME0, | |
instance_name=INSTANCE_NAME, | |
runtime_args=args0, | |
success_states=[PipelineStates.COMPLETED], | |
pipeline_timeout=3600, | |
namespace=NAMESPACE, | |
task_id="start_pipeline", | |
) | |
pipeline1 = CloudDataFusionStartPipelineOperator( | |
location=REGION, | |
pipeline_name=PIPELINE_NAME1, | |
instance_name=INSTANCE_NAME, | |
runtime_args=set_runtime_params(), | |
success_states=[PipelineStates.COMPLETED], | |
pipeline_timeout=3600, | |
namespace=NAMESPACE, | |
task_id="get_job_metadata", | |
) | |
pipeline0 >> pipeline1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment