|
import defopt |
|
import typing |
|
import pathlib |
|
import itertools as it |
|
import operator as op |
|
import heapq |
|
import gzip |
|
import os |
|
import sys |
|
|
|
from dataclasses import dataclass, field |
|
|
|
|
|
from typing import Any |
|
|
|
@dataclass(order=False) |
|
class PerBase: |
|
REF: str=field(compare=False) |
|
POS: int |
|
REF_BASE: str=field(compare=False) |
|
DEPTH: int=field(compare=False) |
|
A: int=field(compare=False) |
|
C: int=field(compare=False) |
|
G: int=field(compare=False) |
|
T: int=field(compare=False) |
|
N: int=field(compare=False) |
|
INS: int=field(compare=False) |
|
DEL: int=field(compare=False) |
|
REF_SKIP: int=field(compare=False) |
|
FAIL: int=field(compare=False) |
|
i: int=field(compare=False, default=0) |
|
tid: int=field(default=0) # compared! |
|
sample: str=field(compare=False, default="UNKNOWN") |
|
|
|
|
|
def __lt__(self, o): |
|
if self.tid != o.tid: |
|
return op.lt(self.tid, o.tid) |
|
return op.lt(self.POS, o.POS) |
|
|
|
def __gt__(self, o): |
|
if self.tid != o.tid: |
|
return op.gt(self.tid, o.tid) |
|
return op.gt(self.POS, o.POS) |
|
|
|
def sample_name(path): |
|
|
|
n = os.path.basename(path) |
|
for ext in [".gz", ".txt", ".perbase"]: |
|
if n.endswith(ext): |
|
n = n[:-len(ext)] |
|
return n |
|
|
|
|
|
def next_line(fh, header, int_fields): |
|
try: |
|
l = next(fh) |
|
d = dict(zip(header, l.strip().split("\t"))) |
|
for f in int_fields: |
|
d[f] = int(d[f]) |
|
return PerBase(**d) |
|
except StopIteration: |
|
return None |
|
|
|
|
|
def lines(handles, chroms: typing.List[str]): |
|
int_fields = "DEPTH A C G T N INS DEL REF_SKIP FAIL POS".split() |
|
|
|
chroms = {c: i for i, c in enumerate(chroms)} |
|
|
|
heap = [] |
|
header = None |
|
for i, h in enumerate(handles): |
|
l = next(h) |
|
assert l[0] == 'R' # starts with REF |
|
header = l.strip().split("\t") |
|
header.remove("NEAR_MAX_DEPTH") |
|
d = next_line(h, header, int_fields) |
|
d.i = i |
|
d.tid = chroms[d.REF] |
|
d.sample = sample_name(h.name) |
|
heapq.heappush(heap, d) |
|
|
|
while len(heap) > 0: |
|
d = heapq.heappop(heap) |
|
yield d |
|
i, sample = d.i, d.sample |
|
|
|
d = next_line(handles[i], header, int_fields) |
|
if d is None: continue |
|
d.i, d.sample = i, sample |
|
d.tid = chroms[d.REF] |
|
heapq.heappush(heap, d) |
|
|
|
vcf_header = """\ |
|
##fileformat=VCFv4.2 |
|
%s |
|
##FILTER=<ID=PASS,Description="All filters passed"> |
|
##INFO=<ID=A,Number=1,Type=Integer,Description="count of A's at this site"> |
|
##INFO=<ID=C,Number=1,Type=Integer,Description="count of C's at this site"> |
|
##INFO=<ID=T,Number=1,Type=Integer,Description="count of T's at this site"> |
|
##INFO=<ID=G,Number=1,Type=Integer,Description="count of G's at this site"> |
|
##INFO=<ID=N,Number=1,Type=Integer,Description="count of G's at this site"> |
|
##INFO=<ID=REF_SKIP,Number=1,Type=Integer,Description="count of REF_SKIP's at this site"> |
|
##INFO=<ID=INS,Number=1,Type=Integer,Description="count of INS's at this site"> |
|
##INFO=<ID=DEL,Number=1,Type=Integer,Description="count of DEL's at this site"> |
|
##INFO=<ID=FAIL,Number=1,Type=Integer,Description="count of FAIL's at this site"> |
|
##INFO=<ID=DEPTH,Number=1,Type=Integer,Description="DEPTH at this site"> |
|
##FORMAT=<ID=A,Number=1,Type=Integer,Description="count of A's at this site"> |
|
##FORMAT=<ID=C,Number=1,Type=Integer,Description="count of C's at this site"> |
|
##FORMAT=<ID=T,Number=1,Type=Integer,Description="count of T's at this site"> |
|
##FORMAT=<ID=G,Number=1,Type=Integer,Description="count of G's at this site"> |
|
##FORMAT=<ID=N,Number=1,Type=Integer,Description="count of G's at this site"> |
|
##FORMAT=<ID=REF_SKIP,Number=1,Type=Integer,Description="count of REF_SKIP's at this site"> |
|
##FORMAT=<ID=INS,Number=1,Type=Integer,Description="count of INS's at this site"> |
|
##FORMAT=<ID=DEL,Number=1,Type=Integer,Description="count of DEL's at this site"> |
|
##FORMAT=<ID=FAIL,Number=1,Type=Integer,Description="count of FAIL's at this site"> |
|
##FORMAT=<ID=DEPTH,Number=1,Type=Integer,Description="DEPTH at this site"> |
|
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT """ |
|
|
|
|
|
def main(*, fai: pathlib.Path, files:typing.List[pathlib.Path], min_depth:int=0, min_alt_depth:int=1): |
|
""" |
|
|
|
:param list[pathlib.Path] files: perbase files output with -z (zero-based) |
|
:param int min_depth: only display sites where at least 1 sample has this depth |
|
:param int min_alt_depth: only display sites where at least 1 sample has this depth for a non-reference allele |
|
""" |
|
handles = [gzip.open(f, 'rt') for f in files] |
|
samples = [sample_name(h.name) for h in handles] |
|
chroms = [l.strip().split("\t")[0] for l in open(fai)] |
|
header_chroms = ['##contig=<ID=%s>' % c for c in chroms] |
|
|
|
print(vcf_header % "\n".join(header_chroms), end="") |
|
print("\t".join(samples)) |
|
|
|
all_alleles = set("ACTG") |
|
fields = ['A', 'C', 'T', 'G', 'N', 'REF_SKIP', 'INS', 'DEL', 'FAIL'] |
|
|
|
for g, ls in it.groupby(lines(handles, chroms), op.attrgetter('REF', 'POS')): |
|
ls = list(ls) |
|
if min_depth > 0 and max(l.DEPTH for l in ls) < min_depth: continue |
|
alts = all_alleles - set(ls[0].REF_BASE) |
|
by_sample = {l.sample: l for l in ls} |
|
for alt in alts: |
|
if max(getattr(l, alt) for l in ls) < min_alt_depth: continue |
|
line = [ls[0].REF, ls[0].POS + 1, '.', ls[0].REF_BASE, alt, 10, "."] |
|
info = {} |
|
fmt = {} |
|
for sample in samples: |
|
if not sample in by_sample: |
|
fmt[sample] = ":".join(['.'] * len(fields)) |
|
else: |
|
fmt[sample] = ":".join([str(getattr(by_sample[sample], f)) for f in fields]) |
|
for field in fields: |
|
if not field in info: info[field] = 0 |
|
info[field] += getattr(by_sample[sample], field) |
|
|
|
line.append(";".join(f"{n}={v}" for n, v in info.items())) |
|
line.append(":".join(fields)) |
|
line.extend(fmt[s] for s in samples) |
|
print("\t".join(map(str, line))) |
|
|
|
if __name__ == '__main__': |
|
defopt.run(main) |