Skip to content

Instantly share code, notes, and snippets.

@cjw85
Last active April 18, 2018 23:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cjw85/8c3d2421926ca703ae43441b8cd97207 to your computer and use it in GitHub Desktop.
Save cjw85/8c3d2421926ca703ae43441b8cd97207 to your computer and use it in GitHub Desktop.
Simulate basecalls via simulated squiggles. This is not a serious attempt to model correctly error profiles in basecalls.
# Requirements: pip install numpy, pysam, scrappie
#
# usage: simulate_calls.py [-h] [--mu MU] [--sigma SIGMA] [--noise NOISE]
# [--threads THREADS]
# fasta ncalls
#
# Simulate basecalls with scrappy.
#
# positional arguments:
# fasta Source sequence file.
# ncalls Number of basecalls to produce.
#
# optional arguments:
# -h, --help show this help message and exit
# --mu MU mean fragment length.
# --sigma SIGMA stdv fragment length.
# --noise NOISE Additional Gaussian noise on signal.
# --threads THREADS number of worker threads.
import argparse
from concurrent.futures import ThreadPoolExecutor
import functools
import itertools
import logging
import numpy as np
from pysam import FastaFile
import scrappy
from matplotlib import pyplot as plt
comp = {
'A': 'T', 'T': 'A', 'C': 'G', 'G': 'C', 'X': 'X', 'N': 'N',
'a': 't', 't': 'a', 'c': 'g', 'g': 'c', 'x': 'x', 'n': 'n',
#'-': '-'
}
comp_trans = str.maketrans(''.join(comp.keys()), ''.join(comp.values()))
def reverse_complement(seq):
"""Reverse complement sequence.
:param: input sequence string.
:returns: reverse-complemented string.
"""
return seq.translate(comp_trans)[::-1]
def shotgun_library(fasta_file, mu, sigma, direction=(1,-1)):
"""Generate random fragment sequences of a given input sequence
:param seq: input sequence.
:param mu: mean fragment length.
:param sigma: stdv of fragment length.
:param direction: tuple represention direction of output sequences with
respect to the input sequence.
:yields: sequence fragments.
.. note:: Could be made more efficient using buffers for random samples
and handling cases separately.
"""
fasta = FastaFile(fasta_file)
seq_lens = [fasta.get_reference_length(x) for x in fasta.references]
total_len = sum(seq_lens)
seq_probs = [x / total_len for x in seq_lens]
# FastaFile.fetch is proper slow, just read everything
refs = fasta.references
fasta = {k:fasta.fetch(k) for k in refs}
def random_buffer(probs, size=10000):
while True:
buf = []
for x, n in zip(range(len(probs)), np.random.multinomial(size, probs)):
buf.extend([x]*n)
np.random.shuffle(buf)
for x in buf:
yield x
seq_chooser = random_buffer(seq_probs)
# parameters for lognormal
mean = np.log(mu / np.sqrt(1 + sigma**2 / mu**2))
stdv = np.sqrt(np.log(1 + sigma**2 / mu**2))
while True:
# choose a seq based on length
seq_i = next(seq_chooser)
seq = fasta[refs[seq_i]]
seq_len = seq_lens[seq_i]
start = np.random.randint(0, seq_len)
frag_length = int(np.random.lognormal(mean, stdv))
move = np.random.choice(direction)
end = max(0, start + move*frag_length)
start, end = sorted([start, end])
if end - start < 2:
# Expand a bit to ensure we grab at least one base.
start = max(0, start - 1)
end += 1
frag_seq = seq[start:end]
if move == -1:
frag_seq = reverse_complement(frag_seq)
yield frag_seq, refs[seq_i], start, end, '+' if move == 1 else '-'
def worker(args, noise=None):
model='rgrgr_r94'
seq, ref, start, end, strand = args
squiggle = scrappy.sequence_to_squiggle(
seq, rescale=True).data(as_numpy=True, sloika=False)
n = 1/np.sqrt(2)
raw_data = np.concatenate([
np.random.laplace(mean, n*stdv, int(dwell))
for mean, stdv, dwell in squiggle
])
if noise is not None:
raw_data += np.random.normal(scale=noise, size=len(raw_data))
raw = scrappy.RawTable(raw_data)
raw.scale()
post = scrappy.calc_post(raw, model, log=True)
call, score, _ = scrappy.decode_post(post, model)
return '>call_{}:{}-{}({}) seq_len={} call_len={} score={}\n{}'.format(
ref, start, end, strand, len(seq), len(call), score, call)
def main():
logging.basicConfig(format='[%(asctime)s - %(name)s] %(message)s', datefmt='%H:%M:%S', level=logging.INFO)
parser = argparse.ArgumentParser(description='Simulate basecalls with scrappy.')
parser.add_argument('fasta', help='Source sequence file.')
parser.add_argument('ncalls', type=int, help='Number of basecalls to produce.')
parser.add_argument('--mu', type=float, default=8000, help='mean fragment length.')
parser.add_argument('--sigma', type=float, default=1000, help='stdv fragment length.')
parser.add_argument('--noise', type=float, default=0.06, help='Additional Gaussian noise on signal.')
parser.add_argument('--threads', type=int, default=None, help='number of worker threads.')
args = parser.parse_args()
regions = itertools.islice(shotgun_library(
args.fasta, args.mu, args.sigma, direction=(1,-1)
), args.ncalls)
_worker = functools.partial(worker, noise=args.noise)
with ThreadPoolExecutor(max_workers=args.threads) as executor:
for fasta in executor.map(_worker, regions):
print(fasta)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment