Skip to content

Instantly share code, notes, and snippets.

@daler
Created October 1, 2013 00:01
Show Gist options
  • Save daler/6772074 to your computer and use it in GitHub Desktop.
Save daler/6772074 to your computer and use it in GitHub Desktop.
Implement the merging algorithm from Zhang et al (2013) Mol Cell 51(6): 792-806
import numpy as np
import logging
logging.basicConfig(format='%(message)s', level=logging.DEBUG)
class Merger(object):
def __init__(self, bin_size=5, diff_thresh=0.3,
start_thresh=5., extend_thresh=3.):
"""
Implements the merging alrogithm described in [1] for a NumPy array.
Briefly, a "seed" is chosen as the highest-valued sliding window of
size `bin_size` whose mean is at least `start_thresh`. This seed is
then grown one value at a time. For each iteration, first the up- and
then the downstream values are checked. The seed is extended up- or
downstream if the next value is greater than `extend_thresh` and if the
new extended region's mean value is within `diff_thresh` of the
original seed's mean value.
A final extended region is returned when it can no longer be extended
further in either direction.
This procedure is repeated recursively for all remaining seeds that
satisfy the initial conditions.
After initializing an instance of this class with the criteria you'd
like to use, the primary method to use is `recursive_extend()`.
Parameters
----------
bin_size : int
use sliding windows of this size.
diff_thresh : float
Only append a new bin if the resulting region will at least this
fraction of the original max bin
start_thresh : float
Sliding windows that have less than this value will not be
considered candidates for a seed.
extend_thresh : float
Only append a new bin if its value is at least this value.
References
----------
[1] Zhang et al (2013) Mol Cell 51(6): 792-806
"""
self.diff_thresh = diff_thresh
self.start_thresh = start_thresh
self.extend_thresh = extend_thresh
self.bin_size = bin_size
def debug_print(self, x):
"""
Pretty-prints `x` for debugging and testing purposes.
Parameters
----------
x : 1-D NumPy array
Notes
-----
Each line contains one sliding window. The format is
(dn, up) | [array] | RPKM=z
where (up, dn) are the indices for which `x[up, dn] = array`, and RPKM
is the mean value for each sliding window.
"""
windows = self.make_windows(x)
window_rpkms = windows.mean(axis=1)
for ind, z in enumerate(zip(window_rpkms, windows)):
i, j = z
j = ', '.join(['{0:>4}'.format(k) for k in j])
print '({2:>3}, {3:>3}) | {1} | RPKM={0:<6}'\
.format(i, j, ind, ind + self.bin_size)
def make_windows(self, x):
"""
Construct sliding windows of size `self.bin_size` across `x`.
Parameters
----------
x : 1-D NumPy array
Returns
-------
2-D NumPy array of sliding windows
"""
windows = []
for i in range(0, len(x) - (self.bin_size - 1)):
windows.append(
x[i:i + self.bin_size]
)
return np.array(windows)
def extend(self, x):
"""
Perform one round of seed-and-extend.
Identifies the max-valued sliding window to use as a seed, and extends
it as far as possible given the criteria.
Parameters
----------
x : 1-D NumPy array
Returns
-------
(dn, up) : tuple of ints
Lower and upper boundary indices such that `x[dn:up]` is the full
extent of the extended seed.
"""
windows = self.make_windows(x)
window_rpkms = windows.mean(axis=1)
max_ind = None
# Go in reverse (so we start with the highest bins), stopping at the
# first one that meets our criteria
for ind in np.argsort(window_rpkms)[::-1]:
if x[ind] >= self.start_thresh:
max_ind = ind
break
# Didn't find any bins that fit the criteria, so exit quickly.
if max_ind is None:
return None, None
dn = max_ind
up = dn + self.bin_size
region = x[dn:up]
orig_rpkm = region.mean()
if orig_rpkm < self.start_thresh:
return None, None
# up and dn are the current boundaries of the region
logging.warning("initial: %s, %s" % (dn, up))
# Always attempt up and down; the body of the while-loop will disable
# this as appropriate.
tryup = True
trydn = True
while True:
upchanged = False
dnchanged = False
if tryup:
new_up = up + 1
# make sure it's in range
if new_up < len(x):
# make sure that, by extending, the mean doesn't drop too
# low
if (x[dn:new_up].mean() / orig_rpkm) > self.diff_thresh:
# make sure the next bin is high enough to consider.
# Note: here for the `up`, we subtract 1 since in all
# other contexts `up` is used for half-open slices --
# but here it's used as a single index.
if x[new_up - 1] > self.extend_thresh:
up = new_up
upchanged = True
logging.warning('extend up to %s', up)
else:
logging.warning(
"next up bin %s too small", new_up)
# Never want to try going up again!
tryup = False
else:
logging.warning(
"up %s bin makes mean too low (%s vs %s)",
new_up,
(x[dn:new_up].mean() / orig_rpkm), orig_rpkm
)
# Don't make tryup = False here, since adding
# downstream values could potentially boost the mean
# high enough to be OK.
else:
logging.warning("next up %s out of range", new_up)
# Never want to try going up again!
tryup = False
# Same logic as above for "up"
if trydn:
new_dn = dn - 1
if new_dn > 0:
if (x[new_dn:up].mean() / orig_rpkm) > self.diff_thresh:
if x[new_dn] > self.extend_thresh:
dn = new_dn
dnchanged = True
logging.warning('extend dn to %s', dn)
else:
logging.warning("next dn bin %s too small", new_dn)
trydn = False
else:
logging.warning("dn bin %s makes mean too low", new_dn)
logging.warning(
"dn %s bin makes mean too low (%s vs %s)",
new_dn,
(x[new_dn:up].mean() / orig_rpkm), orig_rpkm
)
else:
logging.warning("next dn %s out of range", dn)
trydn = False
# Break condition is if the region stayed the same
if not (upchanged or dnchanged):
break
logging.warning("final: %s, %s" % (dn, up))
return dn, up
def recursive_extend(self, x, items=None):
"""
Returns a list of all mutually-exclusive sub-regions that are each
extended as far as they can be given the provided criteria.
Parameters
----------
x : 1-D NumPy array
Returns
-------
List of (up, dn) tuples, where each tuple describes an extended region
in `x` that statisfies the provided criteria.
"""
if items is None:
items = []
# We'll be setting things to zero, so make a copy
xcopy = x.copy()
# Get boundaries of extended region
dn, up = self.extend(xcopy)
# None found; we're done here
if (dn is None) and (up is None):
return items
# If valid boundaries were found, set that region to zero so it's not
# found again, and then call recursively with the new zeroed-out array.
else:
xcopy[dn:up + 1] = 0
items.append((dn, up))
return self.recursive_extend(xcopy, items)
if __name__ == "__main__":
example = np.array([3, 3, 0, 0, 0, 3, 4, 5, 6, 7, 8, 9, 10, 99, 0, 0, 0, 3,
2, 1, 1, 0, 50, 60, 70, 80, 1, 4, 5, 0, 0, 5, 5, 6, 5,
5, 0, 0, 0], dtype=float)
m = Merger(bin_size=5)
result = m.recursive_extend(example)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment