Skip to content

Instantly share code, notes, and snippets.

@oman36
Created April 24, 2022 18:19
Show Gist options
  • Save oman36/87c8e452ffb51f2ba6f9410e358ed074 to your computer and use it in GitHub Desktop.
Save oman36/87c8e452ffb51f2ba6f9410e358ed074 to your computer and use it in GitHub Desktop.
Makes a Mermaid flowchart from Apache Aiflow DAG.
from collections import (
defaultdict,
deque,
)
def dag_as_mermaid(dag) -> str:
"""
Makes a Mermaid flowchart from Apache Aiflow DAG.
See https://mermaid-js.github.io/mermaid/#/./flowchart
The lightest way to store a dag representation.
"""
queue = deque(dag.roots)
history = set()
relations = []
style2class = {}
style_class_tasks = defaultdict(list)
while queue:
task = queue.popleft()
style = f'fill:{task.ui_color},color:{task.ui_fgcolor}'
if style not in style2class:
style2class[style] = f'class{len(style2class)}'
style_cls = style2class[style]
style_class_tasks[style_cls].append(task.task_id)
if not task.downstream_list and task.task_id not in history:
relations.append(task.task_id)
history.add(task.task_id)
for next_task in task.downstream_list:
relation = f'{task.task_id}({task.task_id}) -->' \
f' {next_task.task_id}({next_task.task_id})'
if relation not in relations:
relations.append(relation)
if next_task.task_id not in history:
history.add(next_task.task_id)
queue.append(next_task)
styles = []
for style, cls in style2class.items():
styles.append(f'classDef {cls} {style};')
for cls, tasks in style_class_tasks.items():
styles.append(f'class {",".join(tasks)} {cls};')
rows = [
f'flowchart {dag.orientation}',
*relations,
*styles,
]
return '\n '.join(rows)
if __name__ == '__main__':
import sys
from airflow.models.dagbag import DagBag
print(dag_as_mermaid(DagBag().get_dag(dag_id=sys.argv[1])))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment