Skip to content

Instantly share code, notes, and snippets.

@brevans
Created August 5, 2014 20:07
Show Gist options
  • Save brevans/978f2d86b4788b3e3560 to your computer and use it in GitHub Desktop.
Save brevans/978f2d86b4788b3e3560 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from os import path, makedirs, remove, stat
import argparse
import re
from glob import iglob
from collections import defaultdict as dd
from Bio import SeqIO
import vcf
AMB = {'AC': 'M', 'AG': 'R', 'AT': 'W', 'CG': 'S', 'CT': 'Y', 'GT': 'K',
'CA': 'M', 'GA': 'R', 'TA': 'W', 'GC': 'S', 'TC': 'Y', 'TG': 'K',
'AA': 'A', 'CC': 'C', 'GG': 'G', 'TT': 'T'}
def m_dir(d):
try:
makedirs(d)
except OSError:
pass
def get_ref_dict(fa_name):
'''
in: the name of a fasta reference file
out: a dictionary of the sequences inside ['seqname']-> Bio.Seq object
'''
return (SeqIO.index(fa_name, "fasta"))
def get_sample_name(vcf_fi, comp_fi):
'''
in: vcf file (with path) and hapcompass file (with path)
if the sample name doesn't match raise an Exception
out: the sample name
'''
comp_name = path.basename(comp_fi)
vcf_name = path.basename(vcf_fi)
pref_match = re.match('(\S+)_out_\S+_solution\.txt$', comp_name)
prefix = pref_match.group(1)
if not vcf_name.startswith(prefix):
print("Your files {} and {} don't seem to match, exiting.".format(
vcf_name, comp_name))
raise Exception("filenames don't match")
return prefix
def get_hap_info(fi):
'''
in: a filename of a haplotype block phasing file that was output
from HapCompass
out: a dict: chrom->bp->[phase1,phase2] and the phasing info not to be used
a list of all unused lines from the phasing file
a set of all loci that had > 1 blocks
'''
info = {}
blocks = set()
dupes = set()
unused_info = []
capture = None
curr_block = None
'''
BLOCK 195 217 1 2 64.0 LG98
VAR_POS_195 195 1 0 1
VAR_POS_217 217 2 0 1
'''
# read in block info
for l in open(fi):
if l == '\n':
continue
tmp = l.split()
if l.startswith('BLOCK'):
curr_block = tmp[6]
#if we encounter more than one block,
# we won't be using this phasing info
if curr_block in blocks:
if curr_block in info:
unused_info += info[curr_block]['original']
del info[curr_block]
else:
unused_info.append(l)
dupes.add(curr_block)
capture = False
unused_info.append(l)
else:
blocks.add(curr_block)
info[curr_block] = {}
info[curr_block]['original'] = [l]
capture = True
elif capture == True:
info[curr_block][int(tmp[1])] = [int(tmp[3]), int(tmp[4])]
info[curr_block]['original'].append(l)
elif capture == False:
unused_info.append(l)
return info, unused_info
def make_vcf_dict(vcf_reader):
vcf_dict = dd(lambda: {})
hets = dd(lambda: 0)
for v in vcf_reader:
vcf_dict[v.CHROM][v.POS] = {'ALLELES': [v.REF] + [x.sequence for x in v.ALT],
'IS_HET': v.samples[0].is_het,
'GT': [int(x) for x in v.samples[0].gt_alleles]}
if v.samples[0].is_het:
hets[v.CHROM] += 1
return vcf_dict, hets
def clean_up(fhs):
for s in fhs:
names = [x.name for x in fhs[s]]
[x.close() for x in fhs[s]]
[remove(fi) for fi in names if stat(fi).st_size == 0]
def process_files(fa_file, vcf_dir, hap_dir, out_dir, verb):
m_dir(out_dir)
ref = get_ref_dict(fa_file)
# so I can write sequences to hdls[chrom][0] for phased, [1] for unphased
hdls = {x: [open(path.join(out_dir, x + '_ambi.txt'), 'w'),
open(path.join(out_dir, x + '_phased.txt'), 'w')]
for x in ref.keys()}
unusedfh = open(path.join(out_dir, 'unused_blocks.txt'), 'w')
for comp_fi, vcf_fi in zip(iglob(path.join(hap_dir, '*_solution.txt')),
iglob(path.join(vcf_dir, '*.vcf'))):
name = get_sample_name(vcf_fi, comp_fi)
hap_info, unused = get_hap_info(comp_fi)
pref = name + '\t'
if len(unused) > 0:
unusedfh.write(pref + pref.join(unused))
vcf_reader = vcf.Reader(open(vcf_fi, 'r'))
#snps chrom->pos-> REF:'A', ALT:['T','C']
snps, hetnum = make_vcf_dict(vcf_reader)
for chrom in ref.keys():
if verb:
verb_pref = "Locus {} for individual {} : ".format(name, chrom)
phased = False
#ref is ZERO indexed
seqs = [list(ref[chrom]), list(ref[chrom])]
if chrom not in snps:
phased = True
if verb:
print(verb_pref + " no SNPs found in vcf. Reference sequence used.")
elif chrom in snps:
if hetnum[chrom] == 0:
#phased, fixed snps
phased = True
if verb:
print(verb_pref + " SNPs found, with no heterozygous sites. Phased but fixed genotypes.")
elif hetnum[chrom] == 1:
#phased, one het
phased = True
if verb:
print(verb_pref + " SNPs found, with one heterozygous site. Phasing is easy!")
elif hetnum[chrom] > 1:
if chrom in hap_info and len(hap_info[chrom]) == hetnum[chrom] + 1:
phased = True
if verb:
print(verb_pref + "phased in Hapcompass, all heterozygous sites accounted for.")
elif chrom in hap_info and len(hap_info[chrom]) != hetnum[chrom] + 1:
phased = False
if verb:
print(verb_pref + "phased in Hapcompass, but some sites unaccounted for. Output is ambiguous")
else:
phased = False
if verb:
print(verb_pref + "either HapCompass didn't phase or had multiple blocks for this locus. Output is ambiguous")
for i, bp in enumerate(seqs[0]):
#vcf is ONE indexed
vi = i + 1
if vi in snps[chrom]:
if phased:
if snps[chrom][vi]['IS_HET']:
if chrom in hap_info:
bp1, bp2 = hap_info[chrom][vi]
else:
bp1, bp2 = snps[chrom][vi]['GT']
seqs[0][i] = snps[chrom][vi]['ALLELES'][bp1]
seqs[1][i] = snps[chrom][vi]['ALLELES'][bp2]
else:
bp1, bp2 = snps[chrom][vi]['GT']
seqs[0][i] = snps[chrom][vi]['ALLELES'][bp1]
seqs[1][i] = snps[chrom][vi]['ALLELES'][bp2]
else:
snp = AMB[''.join([snps[chrom][vi]['ALLELES'][x]
for x in snps[chrom][vi]['GT']])]
seqs[0][i] = snp
if phased:
seqa = ''.join(seqs[0])
seqb = ''.join(seqs[1])
hdls[chrom][phased].write('>' + name + 'a\n' + seqa + '\n')
hdls[chrom][phased].write('>' + name + 'b\n' + seqb + '\n')
elif not phased:
seq = ''.join(seqs[0])
hdls[chrom][phased].write('>' + name + '\n' + seq + '\n')
phased = None
clean_up(hdls)
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description=("Given a reference fasta file and a collection "
"of paired vcf/HapCompass output files, generate "
"haplotype sequences for each individual grouped "
"by locus/chromosome"))
parser.add_argument("-r", "--ref", help=("The reference fasta file used in "
"generating the vcf files"), required=True)
parser.add_argument("-v", "--vcf", help=("The directory of vcfs used to "
"generate the HapCompass output file. ONE vcf per sample. "
"Default is current directory."),
default='.')
parser.add_argument("-c", "--hapcompass", help=("The directory containing "
"your HapCompass output _solution.txt files. Default is"
"the current directory."),
default='.')
parser.add_argument("-o", "--out", help=("The directory to store your results. "
"Deefault is ./out."), default='./out')
parser.add_argument("--verbose", help="Print extra info", default=False, action='store_true')
args = parser.parse_args()
fa_file = path.abspath(args.ref)
vcf_dir = path.abspath(args.vcf)
hap_dir = path.abspath(args.hapcompass)
out_dir = path.abspath(args.out)
verb = args.verbose
process_files(fa_file, vcf_dir, hap_dir, out_dir, verb)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment