Skip to content

Instantly share code, notes, and snippets.

@brevans
Created February 2, 2015 18:45
Show Gist options
  • Save brevans/1055dd80afc0e27f7c94 to your computer and use it in GitHub Desktop.
Save brevans/1055dd80afc0e27f7c94 to your computer and use it in GitHub Desktop.
sub-sampler for Dan, specific to her naming scheme
#!/usr/bin/env python
import sys
import re
import random
import argparse
from os import makedirs
from glob import glob
import os.path as path
from collections import defaultdict as dd
from Bio import SeqIO
def parse_file(fn, ft):
'''
Parse a multi-sequence file (fn) of type (ft) with biopython
Return a dictionary of individual key -> list of sequences
and a dictionary of population key -> set of individuals
'''
seqs = dd(lambda: [])
pops = dd(lambda: set())
for seq_req in SeqIO.parse(fn, ft):
mat = re.match('([a-zA-Z]+)_?([a-zA-Z]*\d*)([ab]?)_(\w+)', seq_req.id)
try:
ind_attribs = mat.groups()
except AttributeError as e:
print("Sequence Sample {} in file {} didn't match expected naming scheme!".format(seq_req.id, fn))
raise e
ind_key = ''.join(ind_attribs[:2])
pops[ind_attribs[0]].add(ind_key)
seqs[ind_key].append(seq_req)
return seqs, pops
def generate_random_samples(files_list, dir_out, rand_indivs, rand_samples, ft):
'''
Given a list of sequence files, generate random sub-sampling per-population as inferred by individual name
Writes the desired number of resamplings and subsampled individuals per population to subfolders
'''
oom = len(str(rand_samples))
for i in range(rand_samples):
chosen_samples = None
curr_dir = path.join(dir_out, '{:0{}d}'.format(i+1, oom))
try:
makedirs(curr_dir)
except FileExistsError as e:
pass
for fn in files_list:
out_fn = path.join(curr_dir, path.basename(fn))
seqs, pops = parse_file(fn, ft)
if chosen_samples is None:
chosen_samples = [rand for pop in sorted(pops) for rand in random.sample(pops[pop], rand_indivs)]
# explode sequence dict for each chosen sample
sub_sample = [seq for sam in chosen_samples for seq in seqs[sam]]
SeqIO.write(sub_sample, out_fn, ft)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Nexus random sub-sampler')
parser.add_argument('-r', help='Number of sub-samples to generate [default: 10]', type=int, dest='num_samples', default=10)
parser.add_argument('-i', help='Number of individuals to keep per population [default: 1]', type=int, dest='num_inds',
default=1)
parser.add_argument('-o', help='Output base directory [default: ./sub_samples]',
type=str, dest='out_dir', default='./sub_samples')
parser.add_argument('-d', help='Input Directory with files [default: current directory]', type=str, dest='in_dir', default='.')
parser.add_argument('-t', help='File type. [default: nexus]', choices=['nexus', 'fasta'], type=str, dest='file_type',
default='nexus')
args = parser.parse_args()
if args.file_type == 'nexus':
exts = ['.nex']
else:
exts = ['.fa', '.fasta']
seq_files = []
for e in exts:
for fi in glob(path.join(path.abspath(args.in_dir), '*' + e)):
seq_files.append(fi)
if len(seq_files) == 0:
parser.print_help()
print('\nError: Found no sequence files of the type specified\n')
sys.exit(1)
generate_random_samples(seq_files, path.abspath(args.out_dir), args.num_inds, args.num_samples, args.file_type)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment