Skip to content

Instantly share code, notes, and snippets.

@cr3a7ure
Forked from rentzso/trigger_subdag.py
Created December 4, 2020 18:41
Show Gist options
  • Save cr3a7ure/058aab3e1692fd6e15182285c339d9b7 to your computer and use it in GitHub Desktop.
Save cr3a7ure/058aab3e1692fd6e15182285c339d9b7 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import copy
from airflow.operators.subdag_operator import SubDagOperator
from airflow.models import DagRun
from airflow.utils.db import create_session
class ExtendedBashOperator(BashOperator):
template_fields = ()
on_pre_execute_template_fields = BashOperator.template_fields
def pre_execute(self, context):
if hasattr(self, 'dag'):
if self.dag.user_defined_macros:
context.update(
self.dag.user_defined_macros)
if self.dag.is_subdag and self.params.get('parent_dag_id'):
with create_session() as session:
parent_dag_run = (
session.query(DagRun)
.filter_by(
dag_id=self.params['parent_dag_id'],
execution_date=context['execution_date'])
.first()
)
context['parent_dag_run'] = parent_dag_run
rt = self.render_template # shortcut to method
for attr in self.__class__.on_pre_execute_template_fields:
content = getattr(self, attr)
if content:
rendered_content = rt(attr, content, context)
setattr(self, attr, rendered_content)
super(ExtendedBashOperator, self).pre_execute(context)
def create_subdag(parent_dag, subdag_task_id):
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 7, 28),
'retries': 0,
'retry_delay': timedelta(minutes=5)
}
sub_dag = DAG('{}.{}'.format(parent_dag, subdag_task_id), default_args=default_args, schedule_interval=None)
bash_task = ExtendedBashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: {{ parent_dag_run.conf["message"] if parent_dag_run else "" }}" ',
params = {
'parent_dag_id': parent_dag
},
dag=sub_dag)
return sub_dag
args = {
'start_date': datetime.utcnow(),
'owner': 'airflow',
}
dag = DAG(
dag_id='trigger_subdag',
default_args=args,
schedule_interval=None)
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
print("this is ds: {}".format(str(ds)))
return ds
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag)
# You can also access the DagRun object in templates
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: {{ dag_run.conf["message"] if dag_run else "" }}" ',
dag=dag)
subdag_operator = SubDagOperator(
task_id="subdag",
subdag=create_subdag(dag.dag_id, "subdag"),
dag=dag
)
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
import copy
from airflow.operators.subdag_operator import SubDagOperator
from airflow.models import DagRun
from airflow.utils.db import create_session
def get_parent_run(parent_dag_id, **kwargs):
with create_session() as session:
parent_dag_run = (
session.query(DagRun)
.filter_by(
dag_id=parent_dag_id,
execution_date=kwargs['execution_date'])
.first()
)
return parent_dag_run
def create_subdag(parent_dag, subdag_task_id):
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 7, 28),
'retries': 0,
'retry_delay': timedelta(minutes=5)
}
sub_dag = DAG('{}.{}'.format(parent_dag, subdag_task_id), default_args=default_args, schedule_interval=None)
get_parent_run_task = PythonOperator(
task_id="get_parent_run",
python_callable=get_parent_run,
provide_context=True,
op_kwargs={
'parent_dag_id': parent_dag
},
dag=sub_dag
)
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: {{ ti.xcom_pull("get_parent_run").conf["message"] }}" ',
dag=sub_dag)
get_parent_run_task >> bash_task
return sub_dag
args = {
'start_date': datetime.utcnow(),
'owner': 'airflow',
}
dag = DAG(
dag_id='trigger_subdag_new',
default_args=args,
schedule_interval=None)
def run_this_func(ds, **kwargs):
print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message']))
print("this is ds: {}".format(str(ds)))
return ds
run_this = PythonOperator(
task_id='run_this',
provide_context=True,
python_callable=run_this_func,
dag=dag)
# You can also access the DagRun object in templates
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: {{ dag_run.conf["message"] if dag_run else "" }}" ',
dag=dag)
subdag_operator = SubDagOperator(
task_id="subdag",
subdag=create_subdag(dag.dag_id, "subdag"),
dag=dag
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment