Skip to content

Instantly share code, notes, and snippets.

@utkarsharma2
Created April 1, 2021 13:54
Show Gist options
  • Save utkarsharma2/517797a7356c5168d5a99ee8b28fec00 to your computer and use it in GitHub Desktop.
Save utkarsharma2/517797a7356c5168d5a99ee8b28fec00 to your computer and use it in GitHub Desktop.
Dynamically Create Airflow DAG(s) via JSON.
"""Dag Factory"""
from datetime import datetime
from airflow import DAG
def create_dag(schedule, default_args, definition):
"""Create dags dynamically."""
with DAG(
definition["name"], schedule_interval=schedule, default_args=default_args
) as dag:
tasks = {}
for node in definition["nodes"]:
operator = load_operator(node["_type"])
params = node["parameters"]
node_name = node["name"].replace(" ", "")
params["task_id"] = node_name
params["dag"] = dag
tasks[node_name] = operator(**params)
for node_name, downstream_conn in definition["connections"].items():
for ds_task in downstream_conn:
tasks[node_name] >> tasks[ds_task]
globals()[definition["name"]] = dag
return dag
def load_operator(name):
"""Load operators dynamically"""
components = name.split('.')
mod = __import__(components[0])
for comp in components[1:]:
mod = getattr(mod, comp)
return mod
default_args = {
"owner": "airflow",
"start_date": datetime(2018, 1, 1),
"email": ["someEmail@gmail.com"],
"email_on_failure": False,
}
definition = {
"name": "demo_5",
"nodes": [
{
"name": "Start",
"_type": "airflow.operators.dummy_operator.DummyOperator",
"parameters": {},
},
{
"name": "HTTPRequest",
"_type": "airflow.operators.http_operator.SimpleHttpOperator",
"position": [
520,
360
],
"parameters": {
"http_conn_id": "dummyapi",
"method": "GET",
"endpoint": "/api/users/2",
"headers": {},
"options": {},
"xcom_push": True,
}
},
{
"name": "End",
"_type": "airflow.operators.dummy_operator.DummyOperator",
"parameters": {},
},
{
"name": "ExecuteCommand",
"_type": "airflow.operators.bash_operator.BashOperator",
"parameters": {
"bash_command": "echo test"
},
}
],
"connections": {
"Start": ["ExecuteCommand", "HTTPRequest"],
"HTTPRequest": ["End"],
"ExecuteCommand": ["End"]
}
}
create_dag(None, default_args, definition)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment