Skip to content

Instantly share code, notes, and snippets.

@mparker2
Created November 29, 2021 10:28
Show Gist options
  • Save mparker2/c9e76791697332692eaad12a37ce70a3 to your computer and use it in GitHub Desktop.
Save mparker2/c9e76791697332692eaad12a37ce70a3 to your computer and use it in GitHub Desktop.
Collapse event level f5c to kmer level
import itertools as it
from functools import partial
from multiprocessing import Pool
import numpy as np
import pandas as pd
import click
EA_TYPES = {
'contig': str,
'position': int,
'reference_kmer': str,
'read_index': int,
'read_name': str,
'strand': str,
'event_index': int,
'event_level_mean': float,
'event_stdv': float,
'event_length': float,
'model_kmer': str,
'model_mean': float,
'model_stdv': float,
'standardized_level': float,
'start_idx': int,
'end_idx': int,
}
COLLAPSED_COLUMNS = [
'contig', 'position', 'reference_kmer',
'read_index', 'strand', 'event_index_start',
'kmer_level_mean', 'kmer_stdv', 'kmer_length',
'model_kmer', 'model_mean', 'model_stdv',
'standardized_level', 'start_idx', 'end_idx'
]
def parse_eventalign(eventalign_fn):
ea_iter = pd.read_csv(
eventalign_fn,
header=0,
sep='\t',
iterator=True,
chunksize=100_000,
dtype=EA_TYPES
)
orphan_records = pd.DataFrame()
for chunk in ea_iter:
chunk = pd.concat([orphan_records, chunk], axis=0)
orphan_read_id = chunk['read_index'].iloc[-1]
for r_id, read_records in chunk.groupby(by='read_index', as_index=False, sort=False):
if r_id != orphan_read_id:
yield read_records
else:
orphan_records = read_records
else:
yield orphan_records
def calculate_kmer_level_stats(means, stdvs, ns):
var = stdvs ** 2
pooled_var = sum(var * ns) / sum(ns)
pooled_var_correction = 0
for i, j in it.combinations(np.arange(len(var)), r=2):
pooled_var_correction += ns[i] * ns[j] * (means[i] - means[j]) ** 2
pooled_var_correction /= sum(ns) ** 2
pooled_std = np.sqrt(pooled_var + pooled_var_correction)
pooled_mean = sum(means * ns) / sum(ns)
return pooled_mean, pooled_std
def collapse_events(records, ignore_n=True):
kmer_size = len(records.reference_kmer.iloc[0])
n_kmer = 'N' * kmer_size
if ignore_n:
records = records.query('model_kmer != @n_kmer')
collapsed = []
for pos, events in records.groupby(by='position', as_index=False, sort=False):
# calc kmer level event stats
kmer_means = events.event_level_mean.values
kmer_stds = events.event_stdv.values
kmer_points = events.end_idx.values - events.start_idx.values
if len(kmer_means) == 1:
kmer_mean = kmer_means[0]
kmer_std = kmer_stds[0]
else:
kmer_mean, kmer_std = calculate_kmer_level_stats(
kmer_means, kmer_stds, kmer_points,
)
kmer_duration = events.event_length.sum()
kmer_event_start_idx = events.event_index.iloc[0]
kmer_start_idx = events.start_idx.min()
kmer_end_idx = events.end_idx.max()
# calc kmer level model stats
model_kmers = events.model_kmer.values
model_means = events.model_mean.values
model_stds = events.model_stdv.values
model_kmer_points = kmer_points
if not ignore_n:
# mask NNNNNs for calculating grouped model means and stdvs
model_kmer_mask = ~events.model_kmer.str.startswith('NNNNN').values
model_kmers = model_kmers[model_kmer_mask]
model_means = model_means[model_kmer_mask]
model_stds = model_stds[model_kmer_mask]
model_kmer_points = model_kmer_points[model_kmer_mask]
if not len(model_kmers):
model_kmer = n_kmer
model_mean, model_std = 0.0, 0.0
elif len(model_kmers) == 1:
model_kmer = model_kmers[0]
model_mean = model_means[0]
model_std = model_stds[0]
else:
model_kmer = model_kmers[0]
model_mean, model_std = calculate_kmer_level_stats(
model_means, model_stds, model_kmer_points,
)
_first = events.iloc[0]
collapsed.append([_first.contig, pos, _first.reference_kmer,
_first.read_index, _first.strand, kmer_event_start_idx,
format(kmer_mean, '.2f'), format(kmer_std, '.3f'), format(kmer_duration, '.5f'),
model_kmer, format(model_mean, '.2f'), format(model_std, '.2f'),
'inf', kmer_start_idx, kmer_end_idx])
return pd.DataFrame(collapsed, columns=COLLAPSED_COLUMNS)
def parallel_collapse_events(ea_stream, ignore_n, processes):
if processes == 1:
for records in ea_stream:
yield collapse_events(records, ignore_n)
else:
pool = Pool(processes)
_collapse = partial(collapse_events, ignore_n=ignore_n)
yield from pool.imap(_collapse, ea_stream, chunksize=5)
def write_eventalign(output_fn, ea_collapsed):
with open(output_fn, 'w') as o:
next(ea_collapsed).to_csv(
o, sep='\t', header=True, index=False, mode='w'
)
for chunk in ea_collapsed:
chunk.to_csv(
o, sep='\t', header=False, index=False, mode='a'
)
@click.command()
@click.option('-e', '--eventalign-fn', required=True)
@click.option('-o', '--output-fn', required=True)
@click.option('--ignore-n/--no-ignore-n', default=True)
@click.option('-p', '--processes', required=False, default=1)
def eventalign_collapse(eventalign_fn, output_fn, ignore_n, processes):
ea_records = parse_eventalign(eventalign_fn)
ea_collapsed = parallel_collapse_events(ea_records, ignore_n, processes)
write_eventalign(output_fn, ea_collapsed)
if __name__ == '__main__':
eventalign_collapse()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment