Skip to content

Instantly share code, notes, and snippets.

@vishnu667
Created July 6, 2020 19:39
Show Gist options
  • Save vishnu667/e37981b7855af9b2f76b3f5db1d398d9 to your computer and use it in GitHub Desktop.
Save vishnu667/e37981b7855af9b2f76b3f5db1d398d9 to your computer and use it in GitHub Desktop.
Dynamically load dags from yaml files
from airflow import DAG
from airflow import configuration
from datetime import date, timedelta, datetime
from deepmerge import Merger
from os import listdir
from os.path import isfile, join
import copy
import glob
import importlib
import os
import time
import yaml
merger = Merger([(list,["override"]),(dict,["merge"])],["override"],["override"])
PROPERTY_DIR = configuration.get('core', 'property_folder')
defaultParams=yaml.load(open(PROPERTY_DIR+"/yaml_dags/params/default_operator_params.yml"),Loader=yaml.FullLoader)
def import_package(name):
module_name, class_name = name.rsplit(".", 1)
class_obj = getattr(importlib.import_module(module_name), class_name)
return class_obj
def mergeDefaultArgs(kwargs,operatorName):
if(operatorName in defaultParams['operators']):
base=copy.deepcopy(defaultParams['operators'][operatorName])
merger.merge(base,kwargs)
return base
else:
return kwargs
def createTask(taskConfig,dag):
operatorClass_ = import_package(taskConfig['operator'])
kwargs = mergeDefaultArgs(taskConfig['kwargs'],taskConfig['operator'])
instance = operatorClass_(dag=dag,task_id=taskConfig['task_name'],*taskConfig['args'],**kwargs)
return instance
def create_task_sequence(task_sequence,tasks):
for task in task_sequence:
tasks[task[0]].set_downstream(tasks[task[1]])
def dagDefaultArguments(config):
default_arguments = config['dag_default_args']
if('start_date' in config):
default_arguments['start_date']=datetime.strptime(config['start_date'], '%Y,%m,%d')
else:
default_arguments['start_date']=datetime.today()
if('end_date' in config):
default_arguments['end_date']=datetime.strptime(config['end_date'], '%Y,%m,%d')
return default_arguments
def generateDag(config):
dag = DAG(config['dag_name'], default_args=dagDefaultArguments(config), schedule_interval=timedelta(days=1))
with dag:
tasks={}
for taskConfig in config['tasks']:
tasks[taskConfig['task_name']]=createTask(taskConfig,dag)
create_task_sequence(config['task_sequence'],tasks)
return dag
property_folder=PROPERTY_DIR+"/yaml_dags/"
files =list(map(lambda x:property_folder+'/'+x,filter(lambda x:x.endswith(".yml"), [f for f in listdir(property_folder) if isfile(join(property_folder, f))])))
for filePath in files:
config = yaml.load(open(filePath),Loader=yaml.FullLoader)
if('is_active' in config):
if(config['is_active']):
print("Making Dag "+config['dag_name'])
globals()[config['dag_name']]=generateDag(config)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment