Skip to content

Instantly share code, notes, and snippets.

@mparker2
Created May 5, 2017 09:22
Show Gist options
  • Save mparker2/8600a8841eac2a20fe166e85f51ae32e to your computer and use it in GitHub Desktop.
Save mparker2/8600a8841eac2a20fe166e85f51ae32e to your computer and use it in GitHub Desktop.
Create clusters of probable PCR duplicates in FASTQ using Unique Molecular Identifiers
import sys
from random import shuffle
from collections import Counter, defaultdict
import pysam
import click
import networkx as nx
def edit_distance(umi_a, umi_b):
'''
Hamming distance between two sequences.
'''
return sum(a != b for a, b in zip(umi_a, umi_b))
def test_count_ratio(count_a, count_b):
'''
sort counts and determine if they fulfill the expression:
larger > 2 * smaller - 1
'''
count_a, count_b = sorted([count_a, count_b], reverse=True)
if count_a > (2 * count_b - 1):
return True
else:
return False
def build_substr_idx(umis, sub_size, idx_size):
'''
Build a dictionary of nearest neighbours using substrings, can be used
to heuristically reduce the number of pairwise comparisons.
see:
https://cs.stackexchange.com/questions/73865/approximate-similarity-search
for more details.
'''
# create index for string where sub_size chars are selected
idx = sub_size * [1, ] + (len(umis[0]) - sub_size) * [0, ]
substr_idx = defaultdict(lambda: defaultdict(set))
for _ in range(idx_size):
# each section of the substr_idx uses different random substrings of
# the same length.
shuffle(idx)
idx_tup = tuple(idx)
for u in umis:
u_sub = ''.join(n for i, n in zip(idx, u) if i)
substr_idx[idx_tup][u_sub].add(u)
return substr_idx
def get_umi_neighbours(u, substr_idx):
'''
use substring dict to get (approximately) all the nearest neighbours to a
umi.
'''
neighbours = set()
for idx, substr_map in substr_idx.items():
u_sub = ''.join(n for i, n in zip(idx, u) if i)
neighbours = neighbours.union(substr_map[u_sub])
neighbours.remove(u)
return neighbours
def create_directional_groups(umis, min_edit, substr_size, index_size):
'''
Create a network of umis connected by having less than min_edit differences
in sequence, and return all the subgraphs of the network (as clusters of
probable PCR duplicates).
'''
umi_counts = Counter(umis)
substr_idx = build_substr_idx(list(umi_counts.keys()),
substr_size,
index_size)
g = nx.Graph()
g.add_nodes_from(umi_counts)
for umi_a in umi_counts:
for umi_b in get_umi_neighbours(umi_a, substr_idx):
if edit_distance(umi_a, umi_b) <= min_edit and test_count_ratio(
umi_counts[umi_a], umi_counts[umi_b]):
g.add_edge(umi_a, umi_b)
labelled_umis = {}
n_groups = 0
for i, subgraph in enumerate(nx.connected_components(g), start=1):
n_groups += 1
for umi in subgraph:
labelled_umis[umi] = str(i)
return labelled_umis, n_groups
@click.command()
@click.argument('fastq')
@click.option('--min-edit-dist', '-e', type=int, default=1)
@click.option('--substr-size', '-s', type=int, default=10)
@click.option('--index-size', '-i', type=int, default=100)
def group_fastq(fastq, min_edit_dist, substr_size, index_size):
'''
Heuristically cluster probable PCR duplicates using UMIs encoded in the
read name. Uses directional method.
'''
umis = []
click.echo('Reading UMIs from {}'.format(fastq), err=True)
with pysam.FastxFile(fastq) as fq:
for record in fq:
_, umi = record.name.rsplit('_', 1)
umis.append(umi)
click.echo('Found {} unique UMI sequences in {} reads'.format(
len(set(umis)), len(umis)), err=True)
click.echo('Producing UMI subgraphs...', err=True)
labelled_umis, n_groups = create_directional_groups(umis,
min_edit_dist,
substr_size,
index_size)
click.echo('Found {} UMI subgraphs...'.format(n_groups), err=True)
zf = len(str(n_groups))
with pysam.FastxFile(fastq) as fq:
for record in fq:
_, umi = record.name.rsplit('_', 1)
umi_group = labelled_umis[umi]
record_id = '@{}_{}'.format(record.name,
umi_group.zfill(zf))
if record.comment:
record_id = '{} {}'.format(record_id, record.comment)
sys.stdout.write('{}\n'.format(record_id))
sys.stdout.write('{}\n'.format(record.sequence))
sys.stdout.write('+\n')
sys.stdout.write('{}\n'.format(record.quality))
if __name__ == '__main__':
group_fastq()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment