Skip to content

Instantly share code, notes, and snippets.

@KamilSJaron
Last active June 10, 2023 02:36
Show Gist options
  • Save KamilSJaron/cc8ef01947c03fd80abe6bac7451469f to your computer and use it in GitHub Desktop.
Save KamilSJaron/cc8ef01947c03fd80abe6bac7451469f to your computer and use it in GitHub Desktop.
Convertor of manta output to paragraph compatible vcf file(s)
#!/usr/bin/env python3
#
# This script was originally called "convertInversion.py" and was shipped in the Manta package hosted at https://github.com/Illumina/manta
# Manta - Structural Variant and Indel Caller
# Copyright (c) 2013-2020 Illumina, Inc., Kamil S. Jaron
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
# Kamil's adjustments
# - converting to python3
# - removing the samtools fullpath (assuming to be in PATH)
# - creating Paragraph compatible VCF file (including other aspects that are incompatible now, such as filtering of border SVs and <INV> placeholders)
import sys
import gzip
import argparse
from subprocess import check_output
from os import path
from os.path import exists
from collections import defaultdict
from pyfaidx import Fasta
class VcfRecord:
def __init__(self, inline):
tokens = inline.strip().split('\t')
self.chr = tokens[0]
self.pos = int(tokens[1])
self.vid = tokens[2]
self.ref = tokens[3]
self.alt = tokens[4]
self.qual = tokens[5]
self.filter = tokens[6]
self.info = tokens[7].split(';')
self.others = "\t".join(tokens[8:])
# Create a dictionary for INFO
self.infoDict ={}
for infoItem in self.info:
items = infoItem.split('=')
if len(items) == 1:
self.infoDict[items[0]] = True
elif len(items) > 1:
self.infoDict[items[0]] = items[1]
self.isINV3 = False
self.isINV5 = False
self.mateChr = ""
self.matePos = -1
def checkInversion(self):
def getMateInfo(splitChar):
items = self.alt.split(splitChar)
[self.mateChr, matePos] = items[1].split(':')
self.matePos = int(matePos)
if self.alt.startswith('['):
getMateInfo('[')
if self.mateChr == self.chr:
self.isINV5 = True
elif self.alt.endswith(']'):
getMateInfo(']')
if self.mateChr == self.chr:
self.isINV3 = True
def adjustVcfRecords(self):
# rename MantaDUP:TANDEM to simply MantaDUP and <DUP> respectively
if "MantaDUP:TANDEM" in self.vid:
self.vid = "MantaDUP" + self.vid[15:]
self.alt = "<DUP>"
def makeLine(self):
infoStr = ";".join(self.info)
self.line = "\t".join((self.chr,
str(self.pos),
self.vid,
self.ref,
self.alt,
self.qual,
self.filter,
infoStr,
self.others
))+"\n"
def scanVcf(vcfFile):
invMateDict = {}
if vcfFile.endswith('gz'):
fpVcf = gzip.open(vcfFile, 'rt')
else:
fpVcf = open(vcfFile, 'r')
for line in fpVcf:
if line.startswith('#'):
continue
# else:
# break
vcfRec = VcfRecord(line)
vcfRec.checkInversion()
if vcfRec.isINV3 or vcfRec.isINV5:
if vcfRec.vid in invMateDict:
# update mate INFO
invMateDict[vcfRec.vid] = vcfRec.infoDict
else:
mateId = vcfRec.infoDict["MATEID"]
invMateDict[mateId] = ""
return invMateDict
def convertInversions(args, invMateDict):
isHeaderInfoAdded = False
isHeaderAltAdded = False
filteringStats = defaultdict(int)
if args.prefix_split_by_type:
out_files = dict()
out_files['INV'] = open(args.prefix_split_by_type + "_INV.vcf", 'w')
out_files['INS'] = open(args.prefix_split_by_type + "_INS.vcf", 'w')
out_files['DUP'] = open(args.prefix_split_by_type + "_DUP.vcf", 'w')
out_files['DEL'] = open(args.prefix_split_by_type + "_DEL.vcf", 'w')
if args.mantaVcf.endswith('gz'):
fpVcf = gzip.open(args.mantaVcf, 'rt')
else:
fpVcf = open(args.mantaVcf, 'r')
refFasta = Fasta(args.refFasta)
for line in fpVcf:
if line.startswith('#'):
if (not isHeaderInfoAdded) and line.startswith("##FORMAT="):
header_line = "##INFO=<ID=INV3,Number=0,Type=Flag,Description=\"Inversion breakends open 3' of reported location\">\n##INFO=<ID=INV5,Number=0,Type=Flag,Description=\"Inversion breakends open 5' of reported location\">\n"
if args.prefix_split_by_type:
out_files["INV"].write(header_line)
else:
sys.stdout.write(header_line)
isHeaderInfoAdded = True
if (not isHeaderAltAdded) and line.startswith("##ALT="):
header_line = "##ALT=<ID=INV,Description=\"Inversion\">\n"
if args.prefix_split_by_type:
out_files["INV"].write(header_line)
else:
sys.stdout.write(header_line)
isHeaderAltAdded = True
if args.prefix_split_by_type:
out_files["INV"].write(line)
out_files["DEL"].write(line)
out_files["INS"].write(line)
out_files["DUP"].write(line)
else:
sys.stdout.write(line)
continue
vcfRec = VcfRecord(line)
# skip mate record
if vcfRec.vid in invMateDict:
continue
vcfRec.checkInversion()
if vcfRec.isINV3 or vcfRec.isINV5:
if vcfRec.isINV5:
# adjust POS for INV5
# KSJ comments:
# This is a bit iffy part, I am not sure why original script was shifting the position by 1
# but the genotyper can genotype slighlt imprecise variants and therefore I don't think it will change that much
# In cases that the 5' breakpoint is on position 1 it shifts outside of the scaffold
# which the reason why I am shifting only if it's not outside of the scaffold
if vcfRec.pos > 1:
vcfRec.pos -= 1
vcfRec.matePos -= 1
seq = refFasta[vcfRec.chr][(vcfRec.pos - 1):vcfRec.pos]
vcfRec.ref = seq.seq.upper()
# update manta ID
vidSuffix = vcfRec.vid.split("MantaBND")[1]
idx = vidSuffix.rfind(':')
vcfRec.vid = "MantaINV%s" % vidSuffix[:idx]
# symbolic ALT
vcfRec.alt = "<INV>"
# add END
infoEndStr = "END=%d" % vcfRec.matePos
newInfo = [infoEndStr]
for infoItem in vcfRec.info:
if infoItem.startswith("SVTYPE"):
# change SVTYPE
newInfo.append("SVTYPE=INV")
# add SVLEN
infoSvLenStr = "SVLEN=%d" % (vcfRec.matePos-vcfRec.pos)
newInfo.append(infoSvLenStr)
elif infoItem.startswith("CIPOS"):
newInfo.append(infoItem)
# set CIEND
isImprecise = "IMPRECISE" in vcfRec.infoDict
# for imprecise calls, set CIEND to the mate breakpoint's CIPOS
if isImprecise:
mateId = vcfRec.infoDict["MATEID"]
mateInfoDict = invMateDict[mateId]
infoCiEndStr = "CIEND=%s" % (mateInfoDict["CIPOS"])
newInfo.append(infoCiEndStr)
# for precise calls, set CIEND w.r.t HOMLEN
else:
if "HOMLEN" in vcfRec.infoDict:
infoCiEndStr = "CIEND=-%s,0" % vcfRec.infoDict["HOMLEN"]
newInfo.append(infoCiEndStr)
elif infoItem.startswith("HOMSEQ"):
# update HOMSEQ for INV5
if vcfRec.isINV5:
cipos = vcfRec.infoDict["CIPOS"].split(',')
homSeqStart = vcfRec.pos + int(cipos[0]) + 1
homSeqEnd = vcfRec.pos + int(cipos[1])
# I have not verified this is correct, it's using the same logic as above
# but as the tag is not used I decided not to dig into this much
refSeq = refFasta[vcfRec.chr][(homSeqStart - 1):(homSeqEnd)].seq.upper()
infoHomSeqStr = "HOMSEQ=%s" % refSeq
newInfo.append(infoHomSeqStr)
else:
newInfo.append(infoItem)
# skip BND-specific tags
elif (infoItem.startswith("MATEID") or
infoItem.startswith("BND_DEPTH") or
infoItem.startswith("MATE_BND_DEPTH")):
continue
# update event ID
elif infoItem.startswith("EVENT"):
eidSuffix = vcfRec.infoDict["EVENT"].split("MantaBND")[1]
idx = vidSuffix.rfind(':')
infoEventStr = "EVENT=MantaINV%s" % eidSuffix[:idx]
newInfo.append(infoEventStr)
# apply all other tags
else:
newInfo.append(infoItem)
# add INV3/INV5 tag
if vcfRec.isINV3:
newInfo.append("INV3")
elif vcfRec.isINV5:
newInfo.append("INV5")
vcfRec.info = newInfo
# for adjustments I made a method (now chaniging DUP:TANDEM to DUP)
vcfRec.adjustVcfRecords()
vcfRec.makeLine()
# line chr pos vid ref alt qual filter others
if vcfRec.pos < args.r:
filteringStats['too_close'] += 1
continue
if vcfRec.alt == '<INS>':
filteringStats['unresolved_INS'] += 1
continue
if 'MantaBND' in vcfRec.vid :
filteringStats['BND'] += 1
continue
if vcfRec.alt != '<INS>' and 'SVINSSEQ' in vcfRec.line:
filteringStats['NONINS_SVINSSEQ'] += 1
continue
if args.quality_filtering and vcfRec.filter != "PASS":
filteringStats['QUAL'] += 1
continue
if vcfRec.ref == "N" :
filteringStats['unknown_ref'] += 1
continue
if args.prefix_split_by_type:
out_files[vcfRec.vid[5:8]].write(vcfRec.line)
else:
sys.stdout.write(vcfRec.line)
if args.prefix_split_by_type:
out_files['INV'].close()
out_files['INS'].close()
out_files['DUP'].close()
out_files['DEL'].close()
return filteringStats
if __name__=='__main__':
parser = argparse.ArgumentParser(description='Converts manta diploidSV output to paragraph compatible vcf file.')
parser.add_argument('refFasta', help='the reference .fasta file (can be gzipped)')
parser.add_argument('mantaVcf', help='the manta _diploidSV.vcf file (can be gzipped)')
parser.add_argument('-r', '-read_length', default=150, type=int, help='to filter the variants located less than read length bases away from the beginning of the scaffolds (default: 150)')
parser.add_argument('--quality_filtering', action="store_true", default=False, help='set to filter all the non-PASS variants (default: False)')
parser.add_argument('-prefix_split_by_type', default='', help='if specified prefix, variants are (default: merged output streamed to std stream)')
# TODO TEST FOR SAMTOOLS
args = parser.parse_args()
# for testing
# args.refFasta = 'test/Tms_sample_ref.fasta'
# args.mantaVcf = 'test/test_diploidSV.vcf'
# args.r = 150
invMateDict = scanVcf(args.mantaVcf)
filteringStats = convertInversions(args, invMateDict)
sys.stderr.write("Filtering " + str(filteringStats['too_close']) + " with postion < " + str(args.r) + "\n")
sys.stderr.write("Filtering " + str(filteringStats['BND']) + " breakpoints (BND) that were not possible to convert to inversions\n")
sys.stderr.write("Filtering " + str(filteringStats['unresolved_INS']) + " unresolved interstions (those with alt tag <INS>)\n")
sys.stderr.write("Filtering " + str(filteringStats['NONINS_SVINSSEQ']) + " non INS with SVINSSEQ tag in the info (nested SVs)\n")
sys.stderr.write("Filtering " + str(filteringStats['unknown_ref']) + " variants with unknown refernce sequence (N)\n")
if args.quality_filtering:
sys.stderr.write("Filtering " + str(filteringStats['QUAL']) + " variants with non-PASS quality\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment