Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
def set_up_dag_run(context, dag_run_obj):
dag_run_obj.payload = {"config": context["config"]}
dag_run_obj.run_id = str(uuid4())
print context
return dag_run_obj
def launch_workflow_command(args):
config_location = args.config_location
analysis_id = args.analysis_id
workflow_id = args.workflow_id
my_workflow = get_workflow_by_id(workflow_id)
id_from_filename = args.id_from_filename
if not os.path.isdir(config_location):
raise ValueError("config_location must be a path to a directory")
my_dag_run = TriggerDagRunOperator(
dag_id=my_workflow.workflow_name,
python_callable=set_up_dag_run,
task_id="run_my_workflow",
owner="airflow")
for root, dirs, files in os.walk(config_location):
for config_file in files:
current_config = create_configuration_from_file(
os.path.join(root, config_file), id_from_filename)
current_analysis_run = create_analysis_run(
analysis_id, current_config.config_id, workflow_id)
set_scheduled(current_analysis_run)
effective_config = get_effective_configuration(
current_analysis_run.analysis_run_id)
effective_config[
"analysis_run_id"] = current_analysis_run.analysis_run_id
my_dag_run.execute({"config": effective_config})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment