Created July 27, 2020 18:35
Convert a novel SNP VCF to a usable CSV table for manually checking novel SNPs
# -*- coding: utf-8 -*-
Created on Wed Feb 12 14:13:00 2020
@author: hunte
import sys
def createBED(pos, bedFile):
with open(bedFile, "w") as w:
w.write("\t".join(["chrY",str(pos-1-500),str(pos + 500)]) + "\n")
import subprocess
def getSequenceFromFasta(bedFile, referenceFile):
sequence = subprocess.check_output(["bedtools", "getfasta", "-fi", referenceFile, "-bed", bedFile]).split("\n")[1]
return sequence
def getBLAT(userSeq, blatFile):
a = subprocess.check_output(['curl', '-d', "userSeq=" + userSeq + "&type=DNA", '-X', 'POST', ''])
with open(blatFile, "w") as w:
def executeMinimap2(referenceFile, fastQFile, pafOutputFile):
a = subprocess.check_output(['minimap2', '-c', '-X', referenceFile, fastQFile])
with open(pafOutputFile, "w") as w:
ignoreSequences = ["fix","alt"]
def parsePAF(pos, pafOutputFile):
fails = {}
startPosition = pos - 501
endPosition = pos + 500
with open(pafOutputFile, "r") as r:
lines = r.readlines()
for line in lines:
splitRow = line.split("\t")
querySeqLength = int(splitRow[1])
targSeqName = splitRow[5]
targStart = int(splitRow[7])
targEnd = int(splitRow[8])
residueMatches = int(splitRow[9])
alignmentBlockLength = int(splitRow[10])
if any(s in targSeqName for s in ignoreSequences) == False:
if targStart != startPosition or targEnd != endPosition:
minDenom = min([querySeqLength, alignmentBlockLength])
percent = float(residueMatches) / float(minDenom)
if querySeqLength > 500 and alignmentBlockLength > 500 and percent > 0.95:
fails[percent] = str(round(percent * 100,1)) + "% " + targSeqName + ":" + str(targStart) + ".." + str(targEnd) + " " + str(minDenom)
if len(fails) > 0:
maxPercent = max(fails.keys())
return fails[maxPercent]
return "ok"
def parseBLAT(pos, blatOutputFile):
import re
startPosition = pos - 500
fails = []
with open(blatOutputFile, "r") as r:
lines = r.readlines()
foundResults = False
parseString = "YourSeq"#"chrY:" + str(pos-501) + "-" + str(pos+500)
for line in lines:
if foundResults:
if parseString in line:
resultsSplit = line.split(parseString)[2]
stripped = re.sub(' +', ' ', resultsSplit).strip().split(" ")
targStartPos = int(stripped[7])
targEndPos = int(stripped[8])
targSeqName = stripped[5]
if any(s in targSeqName for s in ignoreSequences) == False:
if targStartPos != startPosition or targEndPos != startPosition + 1000:
percent = float(stripped[4].replace("%","")) / 100
targStart = int(stripped[1])
targEnd = int(stripped[2])
span = int(stripped[9])
if targEnd - targStart > 500 and span > 500 and percent >= .95:
fails.append(str(round(percent * 100,1)) + "% " + targSeqName + ":" + str(targStart) + ".." + str(targEnd) + " " + str(min([span,targEnd - targStart])))
if "----------" in line:
foundResults = True
return fails
def createFastQ(seq, fastQFilename):
with open(fastQFilename, "w") as w:
w.write(seq + "\n")
dummy = "".join("F" for f in seq)
w.write(dummy + "\n")
def oneOff(referenceFile, refIndexFile, position):
bedFile = "bed.bed"
fastQFile = "fastq.fastq"
createBED(position, bedFile)
seq = getSequenceFromFasta(bedFile, referenceFile)
createFastQ(seq, fastQFile)
pafOutputFile = "alignment.paf"
executeMinimap2(refIndexFile, fastQFile, pafOutputFile)
return parsePAF(position, pafOutputFile)
def testPAF(refIndexFile, position):
pafOutputFile = "alignment.paf"
return parsePAF(position, pafOutputFile)
def blatOneOff(referenceFile, position):
bedFile = "bed.bed"
createBED(position, bedFile)
blatFile = str(position) + "_BLAT"
seq = getSequenceFromFasta(bedFile, referenceFile)
getBLAT(seq, blatFile)
fails = parseBLAT(position, blatFile)
if len(fails) > 0:
return fails[0]
return "ok"
def checkRanges(pos):
rangeMap = {"PAR1": [0,2781478],
"CEN": [10072349,11686749],
"DYZ19": [20054913, 20351053],
for r in rangeMap:
if pos >= rangeMap[r][0] and pos <= rangeMap[r][1]:
return r
return "ok"
def analyzeNovelSNPs(vcfFile, outputFile, referenceFile):
refIndexFile = referenceFile + ".mmi"
import time
start_time = time.time()
passing = []
with open(vcfFile, "r") as f:
with open(outputFile, "w") as w:
lines = f.readlines()
headers = lines[0].replace("\n","").split("\t")
headers = headers + ["Exclusions","BLAT link","UCSC Genome Browser"]
w.write("\t".join(headers) + "\n")
for line in lines[1:]:
rowSplit = line.replace("\n","").split("\t")
pos = rowSplit[1]
idVal = "chrY:" + pos
intpos = int(pos)
rowSplit[2] = idVal
checkedRange = checkRanges(intpos)
if checkedRange == "ok":
checkedRange = oneOff(referenceFile, refIndexFile, intpos)
if checkedRange == "ok":
uscsGenomeBrowser = "" + str(intpos - 5) + "%2D" + str(intpos + 5) + "&hgsid=661900165_RCnZU2Sd4dWMqySzhwCkmOqVSxQI"
rowSplit = rowSplit + [checkedRange, blatLink, uscsGenomeBrowser]
w.write("\t".join(rowSplit) + "\n")
with open("novelPassingPositionsForBLATCheck.txt", "w") as w:
end_time = time.time()
print("elapsed time: " + str(end_time - start_time) + "s")
from time import sleep
if len(sys.argv) > 3:
mode = sys.argv[1]
if mode == "-batch":
vcfFile = sys.argv[2]
outputFile = sys.argv[3]
referenceFile = sys.argv[4]
analyzeNovelSNPs(vcfFile, outputFile, referenceFile)
if mode == "-blat":
delay = 16
results = []
referenceFile = sys.argv[2]
positions = []
oktotal = 0
for pos in sys.argv[3].split(","):
for pos in positions:
result = blatOneOff(referenceFile, pos)
if result == "ok":
oktotal = oktotal + 1
results.append(str(pos) + " " + result)
for result in results:
print(str(oktotal) + " of " + str(len(positions)) + " pass")
if mode == "-minimap":
referenceFile = sys.argv[2]
position = int(sys.argv[3])
print(oneOff(referenceFile, position))
