Skip to content

Instantly share code, notes, and snippets.

@linnil1
Created January 10, 2022 07:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save linnil1/e6490fa7e8d44532b7a8eb2405206ffa to your computer and use it in GitHub Desktop.
Save linnil1/e6490fa7e8d44532b7a8eb2405206ffa to your computer and use it in GitHub Desktop.
"""
Airflow Example: Build a dynamic read-mapping DAG for multiple samples
Author: linnil1
Run bowtie2(read-maper) on two samples to a reference fasta
1. Prepare data like this
```
$ ls /home/linnil1/airflow/data/
chrx1.synthetic.read1.fq chrx1.synthetic.read2.fq chrx.fa chrx.synthetic.read1.fq chrx.synthetic.read2.fq
```
2. Setup your airflow
https://github.com/linnil1/cwl_example#run-with-airflow
3. Setup variaribles (in Admin/variaribles)
```
basic_folder /home/linnil1/airflow/
reads data/chrx.synthetic,data/chrx1.synthetic
reference_fa data/chrx.fa
```
4. Move this file to airflow/dags
5. Trigger It
"""
from airflow import DAG
from textwrap import dedent
from datetime import datetime, timedelta
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.exceptions import AirflowSkipException
from airflow.models import Variable
from airflow.utils.task_group import TaskGroup
import os
from airflow.exceptions import AirflowSkipException
# Utilies
def getBasename(path):
"""
Input: "a/b/c.d.txt"
Output: ("a/b/c.d", ".txt")
"""
return os.path.splitext(path)[0]
def nameToKey(path):
return path.replace("/", "_").replace(".", "_")
def fileExist(**context):
# https://stackoverflow.com/questions/51725746/airflow-run-a-task-when-some-upstream-is-skipped-by-shortcircuit
print(context['params'])
if os.path.exists(context['params'].get('file')):
raise AirflowSkipException(f"Skip.")
return True
def fileExistOperator(file_path):
return ShortCircuitOperator(
task_id='check_exist_' + nameToKey(file_path),
python_callable=fileExist,
params={
'file': file_path
},
trigger_rule=TriggerRule.NONE_FAILED,
)
# Bowtie mapping DAG
def bowtieMap(name):
key = name.replace("/", "_").replace(".", "_")
with TaskGroup(group_id=f'bowtie_{key}') as tg:
map_exist = fileExistOperator(name + ".bowtie2.bam.bai")
templated_command = dedent("""\
podman run -it --rm -v {{ var.value.basic_folder }}:/app -w /app {{ params.image }} \
bowtie2 --threads 32 -x {{ var.value.reference_fa | getBasename }}.bowtie2 \
""")
templated_command += dedent(f"""\
-1 { name }.read1.fq -2 { name }.read2.fq \
-S { name }.bowtie2.sam
""")
task_map = BashOperator(
task_id=f'bowtie_map',
bash_command=templated_command,
params={
'image': 'quay.io/biocontainers/bowtie2:2.4.4--py39hbb4e92a_0',
},
)
map_exist >> task_map
name = name + ".bowtie2"
templated_command = dedent("""\
podman run -it --rm -v {{ var.value.basic_folder }}:/app -w /app {{ params.image }} \
""")
templated_command += dedent(f"""\
samtools sort -@32 {name}.sam -o {name}.bam
""")
templated_command += dedent("""\
podman run -it --rm -v {{ var.value.basic_folder }}:/app -w /app {{ params.image }} \
""")
templated_command += dedent(f"""\
samtools index -@32 {name}.bam
""")
task_samtool = BashOperator(
task_id=f'samtool',
bash_command=templated_command,
params={
'image': "quay.io/biocontainers/samtools:1.10--h9402c20_2",
},
)
task_map >> task_samtool
return tg
# Bowtie index DAG
def bowtieBuild(reference_fa):
with TaskGroup(group_id=f'bowtie_build') as tg:
index_exist = fileExistOperator(getBasename(reference_fa) + ".bowtie2.1.bt2")
templated_command = dedent("""
podman run -it --rm -v {{ var.value.basic_folder }}:/app -w /app {{ params.image }} \
bowtie2-build --threads 32 {{ var.value.reference_fa }} {{ var.value.reference_fa | getBasename }}.bowtie2
""")
task_build = BashOperator(
task_id='bowtie_build',
bash_command=templated_command,
params={
'image': 'quay.io/biocontainers/bowtie2:2.4.4--py39hbb4e92a_0',
},
)
index_exist >> task_build
return tg
# main
with DAG(
'bowtie_all',
description='Build index and read mapping',
start_date=datetime(2021, 1, 1),
schedule_interval=None,
user_defined_filters={
'getBasename': getBasename
}
) as dag:
dag_build = bowtieBuild(Variable.get("reference_fa"))
dag_maps = []
# Read variable to build DAG, it's Dynamic !!
for name in Variable.get("reads").split(','):
dag_maps.append(bowtieMap(name))
dag_build >> dag_maps
@linnil1
Copy link
Author

linnil1 commented Jan 10, 2022

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment