|
import defopt |
|
from pathlib import Path |
|
import pysam |
|
from typing import List |
|
from typing import Optional |
|
|
|
|
|
def main(*, |
|
in_bam: Path, |
|
out_bam: Path, |
|
in_tag: str, |
|
delim: str, |
|
out_tags: Optional[List[str]] = None, |
|
primary_delim: Optional[str] = None) -> None: |
|
"""Splits a SAM tag based on a delimiter and stores each value in its own tag. |
|
|
|
If `--out-tags` is not given, will use hexidecimal tag keys starting at `a0`. |
|
|
|
All values must have the same # of values. |
|
|
|
Args: |
|
in_bam: the path to the input BAM |
|
out_bam: the path to the input BAM |
|
in_tag: the SAM tag with value to split |
|
delim: the delimiter to split by |
|
out_tags: the list of output SAM tags to use; must have the same # as the # of values after splitting |
|
primary_delim: if multiple values are present, use this to split to give the first value |
|
""" |
|
with pysam.AlignmentFile(str(in_bam)) as reader, pysam.AlignmentFile(str(out_bam), mode="wb", header=reader.header) as writer: |
|
for record in reader: |
|
if not record.has_tag(in_tag): |
|
writer.write(record) |
|
continue |
|
value = record.get_tag(in_tag) |
|
|
|
if not isinstance(value, str): |
|
writer.write(record) |
|
continue |
|
|
|
if primary_delim is not None: |
|
value = value.split(primary_delim)[0] |
|
|
|
values = value.split(delim) |
|
|
|
if out_tags is None: |
|
int_value = int('a0', 16) |
|
out_tags = [] |
|
for _ in range(len(values)): |
|
out_tags.append(hex(int_value)[2:]) |
|
int_value += 1 |
|
|
|
assert len(values) == len(out_tags), ( |
|
f"Length mismatch in values (found: {len(values)}, expected: {len(out_tags)})" |
|
) |
|
for value, out_tag in zip(values, out_tags): |
|
assert not record.has_tag(out_tag), f"Record already has SAM tag '{out_tag}': {record}" |
|
record.set_tag(out_tag, value) |
|
|
|
writer.write(record) |
|
|
|
|
|
if __name__ == '__main__': |
|
defopt.run(main) |