Skip to content

Instantly share code, notes, and snippets.

@kamac
Last active January 5, 2021 10:24
Show Gist options
  • Save kamac/7112af78f1a9004142903d4fe6e387d4 to your computer and use it in GitHub Desktop.
Save kamac/7112af78f1a9004142903d4fe6e387d4 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
import time
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'wait_for_downstream': False
}
dag = DAG(
'test_active_runs',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval="@daily",
start_date=datetime(2010, 2, 11, 10, 30, 0),
max_active_runs=1,
catchup=False,
concurrency=8
)
def sleep(**kwargs):
time.sleep(10)
def done(**kwargs):
print("*" * 80)
print("Done!")
print("*" * 80)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = PythonOperator(
task_id='sleeper',
provide_context=True,
depends_on_past=False,
wait_for_downstream=False,
python_callable=sleep,
dag=dag
)
t2 = PythonOperator(
task_id='done',
provide_context=True,
depends_on_past=False,
wait_for_downstream=False,
python_callable=done,
dag=dag,
)
t1 >> t2
# -*- coding: utf-8 -*-
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.models import DagBag, DagModel
import time
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'wait_for_downstream': False
}
dag = DAG(
'test_active_runs_trigger',
default_args=default_args,
description='Manually trigger test_active_runs DAG',
schedule_interval=None,
start_date=datetime(2010, 2, 11, 10, 30, 0),
max_active_runs=1,
catchup=False,
concurrency=8
)
def trigger_dag(**kwargs):
dag_id = "test_active_runs"
dag_model = DagModel.get_dagmodel(dag_id)
dag_bag = DagBag(
dag_folder=dag_model.fileloc
)
dag = dag_bag.get_dag(dag_id)
execution_date = timezone.utcnow()
dag.create_dagrun(
run_id="manual__{0}".format(execution_date.isoformat()),
execution_date=execution_date,
state=State.RUNNING,
external_trigger=False,
)
# t1, t2 and t3 are examples of tasks created by instantiating operators
t1 = PythonOperator(
task_id='triggerer',
provide_context=True,
depends_on_past=False,
wait_for_downstream=False,
python_callable=trigger_dag,
dag=dag
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment