Skip to content

Instantly share code, notes, and snippets.

@joffilyfe
Created July 31, 2019 17:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joffilyfe/edcb957e83030fd0f582752809a16d99 to your computer and use it in GitHub Desktop.
Save joffilyfe/edcb957e83030fd0f582752809a16d99 to your computer and use it in GitHub Desktop.
import logging
import os
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow.utils.state import State
from airflow.models import DagRun, clear_task_instances
from airflow.utils import timezone
from airflow import settings, models
# https://github.com/apache/airflow/blob/6970b233964ee254bbb343ed8bdc906c2f7bd974/tests/models/test_dagrun.py
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2019, 7, 31),
}
dag = DAG(
dag_id="kernel_packages",
default_args=default_args,
schedule_interval=None,
)
def get_sps_packages(**kwargs):
packages = ["v1n2"]
session = settings.Session()
for package in packages:
logging.info("triggering an external dag with package %s" % package)
now = timezone.utcnow()
dag_run = models.DagRun(
dag_id="kernel-gate",
run_id='manual__%s_%s' % (package, now.isoformat()),
execution_date=now,
start_date=now,
state=State.RUNNING,
external_trigger=True,
 
)
session.add(dag_run)
session.commit()
get_sps_packages_task = PythonOperator(
task_id="get_sps_packages_id",
provide_context=True,
python_callable=get_sps_packages,
dag=dag,
)
get_sps_packages_task
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment