Created February 11, 2010 14:04
MinMax peak detector
Originally from:
import sys, copy, math
import numpy
# default parameters
class peak:
def __init__(self):
def minmax(ia, window,da=_DEFAULT_AREA_SCALE,threshold=_DEFAULT_AREA_THRESHOLD,mode="area"):
@summary: MinMax peak detector
@param ic: An IonChromatogram object
@type ic: pyms.Peak.List.Class.Alignment
@param noise: A number. The noise estimate
@type noise: FloatType
@param window: A string. The width of the window for peak
detection specified as <NUMBER>s or <NUMBER>m, giving a time
in seconds or minutes, respectively
@type window: StringType
@param peak_factor: A number. The value that is multipled to the
noise level to determine the peak threshold for minimum peak
@type peak_factor: IntType
@return: A list.
@rtype: ListType
# if noise < 0 or peak_factor < 0:
# error("negative argument(s) detected.")
# wing_length = window_sele_points(ic, window, half_window=True)
# print " -> Minmax: using window wing of %d point(s)" % (wing_length)
peak_list = _get_peaks(ia, threshold, wing_length,da,mode)
p_max = max_peaks(ia,peak_list,da)
return peak_list,p_max
def _get_peaks(ia, threshold, wing_length,da,mode):
@summary: Peak detection wrapper
@param ia: An numpy object. Ion chromatogram intensity array
@type ia: numpy.ndarray
@param noise: A number. The noise estimate
@type noise: FloatType
@param peak_factor: A number. The value that is multipled to
the noise level to determine the peak threshold for
minimum peak intensities
@type peak_factor: IntType
@param wing_length: An integer. The width measured in number of
points to the left and right of a point to be considered
for an extreme value
@type wing_length: IntType
@return: A list of maxima points and their corresponding left and right
boundaries. Each maximum is modelled using a list containing threex
elements: the left boundary, maximum point, and right boundary
@rtype: ListType
# print " -> Minmax: noise: %lf peak factor: %lf" % (noise,peak_factor)
# peak_threshold = noise * peak_factor
# print " -> Minmax: prob threshold: %lf " % (threshold)
maxima_indices = _identify_maxima(ia, wing_length, threshold,da,mode)
# print " -> Minmax: found local maxima: %d point(s)" % (len(maxima_indices))
peak_list = _peak_boundaries(ia, maxima_indices, wing_length)
# print " -> Minmax: found peak boundaries: %d point(s)" % (len(peak_list))
_resolve_overlaps(ia, peak_list)
#_prune_peaks(peak_list, wing_length)
_adjust_boundaries(ia, peak_list, wing_length, 1.5 )
peak_area(ia, peak_list)
peak_fwhm(ia, peak_list)
return peak_list
def _prune_peaks(peak_list, wing_length):
@summary: Rejects peaks for which the difference between the apex and
left/right boundary is less than wing_length
@param peak_list: A list of pre-peaks
@type peak_list: ListType
@param wing_length: An integer. The width measured in number of
points to the left and right of a point to be considered
for an extreme value
@type wing_length: IntType
@return: No return value
@rtype: NoneType
prune_peaks = []
for peak in peak_list:
peak_left = peak[LEFT_BOUNDARY]
peak_apex = peak[PEAK_APEX]
peak_right = peak[RIGHT_BOUNDARY]
if (peak_apex - peak_left) < wing_length or \
(peak_right - peak_apex) < wing_length:
for peak in prune_peaks:
def _resolve_overlaps(ia, peak_list):
@summary: Resolves overlapped peaks
@param ia: An numpy object. Ion chromatogram intensity array
@type ia: numpy.ndarray
@param peak_list: A list of pre-peaks
@type peak_list: ListType
@return: No return value
@rtype: NoneType
for ii in range(len(peak_list)-1):
peak_crnt = peak_list[ii]
peak_next = peak_list[ii+1]
if peak_crnt.right > peak_next.left:
_adjust_boundary(ia, peak_crnt, peak_next)
def _adjust_boundary(ia, peak_crnt, peak_next):
@summary: Adjusts the boundary between two overlapping peaks. The new
boundary is set to the minimum between the two peak apexes
@param ia: An numpy object. Ion chromatogram intensity array
@type ia: numpy.ndarray
@param peak_crnt: A list [left,max,right] for peak 1
@type peak_crnt: ListType
@param peak_next: A list [left,max,right] for peak 2
@type peak_next: ListType
@return: No return value
@rtype: NoneType
crnt_apex = peak_crnt.index
next_apex = peak_next.index
minx = numpy.argmin(ia[crnt_apex+1:next_apex])
minv = ia[crnt_apex+1:next_apex][minx]
split_point = crnt_apex + minx + 1
peak_crnt.right = split_point
peak_next.left = split_point
def _identify_maxima(ia, wing_length, peak_threshold,da,mode):
@summary: Returns an array of indices where each index corresponds to
a local maximum. A point is a local maximum if the following
are satisfied:
1. It is equal or greater than any of 'wing_length' points to
its left and to its right
2. It is greater than at least one of the points to its left,
and one of the points to its right (out of total 'wing_length'
3. A point closer to the edge of 'ia' than 'wing_length' cannot
be a local maximum
@param ia: An numpy object. Ion chromatogram intensity array
@type ia: numpy.ndarray
@param wing_length: An integer. The width measured in number of
points to the left and right of a point to be considered
for an extreme value
@type wing_length: IntType
@param peak_threshold: A number. The minimum intensity for a peak,
maxima below this value are rejected
@type peak_threshold: FloatType
@return: An array of indices where each index corresponds to a local maximum
@rtype: ListType
maxima_indices = []
index = 0
for index in xrange(ia.size):
is_maxima = _is_local_maximum(ia, index, wing_length)
if is_maxima:
pt_left = max(0,index-wing_length)
pt_right = min(ia.size-1,index+wing_length)
# print "--> Local maxima: %d %d %d" % (index,pt_left,pt_right)
# print "--> Local maxima: %d %lf %lf %lf" % (index,slice_area,da,peak_threshold)
if mode=="area":
is_large_enough = area >= peak_threshold
is_large_enough = ia[index] >= peak_threshold
if is_large_enough: # local maximum found
maxima_indices.append(index) # append to list
index = index + wing_length + 1 # index skip for wing_length
return maxima_indices
def _peak_boundaries(ia, maxima_indices, wing_length):
@summary: Determines the left and right boundary for each of the maxima.
Returns a list of maxima points and their corresponding left and
right boundaries. Each maxima is modelled using a list of three
elements: the left boundary, maxium point, and right boundary
@param ia: An numpy object. Ion chromatogram intensity array
@type ia: numpy.ndarray
@param maxima_indices: A list contains numpy objects. The list indices of
the maxima points
@type maxima_indices: ListType
@param wing_length: An integer. The width measured in number of
points to the left and right of a point to be considered
for an extreme value
@type wing_length: IntType
@return: A list of triplets. Giving indices for [left_bound, maximum,
@rtype: ListType
peak_list = []
# Catch errors in the list of local maxima.
for crnt_ix in maxima_indices:
if crnt_ix < wing_length or crnt_ix > ia.size-wing_length-1:
error("local maxima too close to the boundary")
for ii in range(len(maxima_indices)-1):
crnt_ix = maxima_indices[ii]
next_ix = maxima_indices[ii+1]
if abs(crnt_ix-next_ix) < wing_length+1:
error("local maxima too close to one another")
for ix in range(len(maxima_indices)):
index = maxima_indices[ix]
# get the previous and next maximum
if ix == 0:
index_prev = 0
index_prev = maxima_indices[ix-1]+1
if ix == len(maxima_indices)-1:
index_next = ia.size
index_next = maxima_indices[ix+1]
left_slice = ia[index_prev:index]
left_slice = left_slice[::-1] # reverse left slice
right_slice = ia[index+1:index_next]
left_min_ix = _get_next_minimum(left_slice, wing_length)
right_min_ix = _get_next_minimum(right_slice, wing_length)
left_boundary = index - left_min_ix - 1
right_boundary = index + right_min_ix + 1
return peak_list
def _get_next_minimum(ia_slice, wing_length):
@summary: Determines the first next minimum in 'ia_slice' based on 'wing_length',
'ia_slice' is processed as index increases until the first minimum
is found. If no minimum is found, the minimum is set to the first
point after 'wing_length' counting from the end of the slice
@param ia_slice: An numpy object. Ion chromatogram intensity
array slice
@type ia_slice: numpy.ndarray
@param wing_length: An integer. The width measured in number of
points to the left and right of a point to be considered
for an extreme value
@type wing_length: IntType
@return: An integer. The index of the minimum value
@rtype: IntType
boundary_index = None
for index in range(ia_slice.size):
if _is_local_minimum(ia_slice, index, wing_length):
boundary_index = index
# if the minimum was not found, set the boundary to the last
# point in the slice. If the two peaks are exactly 'wing_length'
# points appart (the closest possible), the right boundary of
# first peak will be set to the left adjacent point of the apex
# of the second peak. And the left boundary of the second peak
# will be set to the right adjacent point of the apex of the
# first peak apex. If a peak is at the beginning/end of a signal,
# and no local minima are found when searching towards the boundary
# the boundary will be set to the first/last point of the signal.
if boundary_index == None:
boundary_index = ia_slice.size - 1
return boundary_index
# Returns True if the point is a local maximum or False otherwise.
# A point is a local maximum if the following are satisfied:
# 1. It is equal or greater than any of 'wing_length' points to
# its left and to its right.
# 2. It is greater than at least one of the points to its left,
# and one of the points to its right (out of total 'wing_length'
# points).
# 3. A point closer to the edge of 'ia' than 'wing_length' cannot
# be a local maximum.
# @param ia An numpy object, ion chromatogram intensity array.
# @param wing_length An integer. The width measured in number of
# points to the left and right of a point to be considered
# for extreme value.
# @return A boolean value.
def _is_local_maximum(ia, index, wing_length):
@summary: Returns True if the point is a local maximum or False otherwise
A point is a local maximum if the following are satisfied:
1. It is equal or greater than any of 'wing_length' points to
its left and to its right
2. It is greater than at least one of the points to its left,
and one of the points to its right (out of total 'wing_length'
3. A point closer to the edge of 'ia' than 'wing_length' cannot
be a local maximum
@param ia: An numpy object. Ion chromatogram intensity array
@type ia: numpy.ndarray
@param index: An integer.Index
@type index: IntType
@param wing_length: An integer. The width measured in number of
points to the left and right of a point to be considered
for an extreme value
@type wing_length: IntType
@return: If the point is a local maximum
@rtype: BooleanType
if index > ia.size:
error("The index (%d) is out of bounds (max index: %d)." % \
(index, ia.size - 1))
intensity = ia[index]
# Get the index for left/right boundary, fold the edges.
left_index = index - wing_length
if left_index > 0:
left_slice_max = numpy.max(ia[left_index:index])
if left_slice_max < intensity:
right_index = index + wing_length + 1
if right_index < ia.size:
right_slice_max = numpy.max(ia[index+1:right_index])
if right_slice_max < intensity:
return valid
# # Test if local maximum. Points closer to the edges than
# # 'wing_length' cannot be a maximum.
# if left_slice.size < wing_length:
# left_wing_valid = False
# else:
# left_wing_valid = _is_greater_than(left_slice, intensity)
# if right_slice.size < wing_length:
# right_wing_valid = False
# else:
# right_wing_valid = _is_greater_than(right_slice, intensity)
# # Return value: True if both wings passed
# if left_wing_valid and right_wing_valid:
# return True
# else:
# return False
def _is_greater_than(ia, intensity):
@summary: Returns True if 'intensity' is greater than or equal to every
element in 'ia' _and_ is greater than at least one element
@param ia: An numpy object. Ion chromatogram intensity array
@type ia: numpy.ndarray
@param intensity: A number. Intensity to which slice is compared
@type intensity: FloatType
@return: If it is greater
@rtype: BooleanType
if ia.size < 1:
error("empty array for comparison")
# numpy.where(ia <= intensity)[0] returns an array of indices
# refering to elements of 'ia' that are smaller or equal than
# 'intensity'
lteq = numpy.where(ia <= intensity)[0]
lt = numpy.where(ia < intensity)[0]
if lteq.size == ia.size and lt.size > 0:
return True
return False
def _is_local_minimum(ia, index, wing_length):
@summary: Returns True if the point is a local minimum or False otherwise
A point is a local maximum if the following are satisfied:
1. It is equal or smaller than any of 'wing_length' points to
its left and to its right
2. It is smllaer than at least one of the points to its left,
and one of the points to its right (out of total 'wing_length'
3. A point closer to the edge of 'ia' than 'wing_length' cannot
be a local minimum
@param ia: An numpy object. Ion chromatogram intensity array
@type ia: numpy.ndarray
@param index: An integer. Index at which to evaluate if local maximum
@type index: IntType
@param wing_length: An integer. The width measured in number of
points to the left and right of a point to be considered
for an extreme value
@type wing_length: IntType
@return: A boolean value
@rtype: BooleanType
if index > ia.size:
error("The index (%d) is out of bounds (max index: %d)." % \
(index, ia.size - 1))
intensity = ia[index]
# Get the index for left/right boundary, fold the edges.
left_index = index - wing_length
if left_index < 0: left_index = 0
left_slice = ia[left_index:index]
right_index = index + wing_length + 1
if right_index > ia.size: right_index = ia.size
right_slice = ia[index+1:right_index]
# Test if local maximum. Points closer to the edges than
# 'wing_length' cannot be a maximum.
if left_slice.size < wing_length:
left_wing_valid = False
left_wing_valid = _is_less_than(left_slice, intensity)
if right_slice.size < wing_length:
right_wing_valid = False
right_wing_valid = _is_less_than(right_slice, intensity)
# Return value: True if both wings passed
if left_wing_valid and right_wing_valid:
return True
return False
def _is_less_than(ia, intensity):
@summary: Returns True if 'intensity' is less than or equal to every
element in 'ia' _and_ is less than at least one element
@param ia: An numpy object. Ion chromatogram intensity array
@type ia: numpy.ndarray
@param intensity: A number. Intensity to which slice is compared
@type intensity: FloatType
@return: a boolean value
@rtype: BooleanType
if ia.size < 1:
error("empty array for comparison")
# numpy.where(ia >= intensity)[0] returns an array of indices
# refering to elements of 'ia' that are greater or equal to
# 'intensity'
lteq = numpy.where(ia >= intensity)[0]
lt = numpy.where(ia > intensity)[0]
if lteq.size == ia.size and lt.size > 0:
return True
return False
def _adjust_boundaries(ia, peak_list, angle_width=_DEFAULT_ANGLE_PTS, \
@summary: Adjusts peak boundaries by fitting the straight line through the
edge points, and trimming point for which the line formes an angle
with the time axis which is less than the threshold
@param ia: An numpy object. Ion chromatogram intensity array
@type ia: numpy.ndarray
@param angle_width: An integer. The number of point for the best fit
@type angle_width: IntType
@param angle_threshold: A number. The angle threshold for peak boundary
points. A line of best fit drawn through an arbitrary number of
points at the peak boundaries must form an angle greater than
the angle threshold. If not, the boundary point will be trimmed
@type angle_threshold: FloatType
@return: No return value
@rtype: NoneType
for peak_item in peak_list:
left_index = peak_item.left
apex_index = peak_item.index
right_index = peak_item.right
apex = ia[apex_index]
left_slice = ia[left_index:apex_index]
right_slice = ia[apex_index+1:right_index+1]
# Reverse the numpy. _trim_boundary() expects the
# numpy from boundary to apex.
right_slice = right_slice[::-1]
left_slice = _trim_boundary(left_slice, apex, angle_width, \
right_slice = _trim_boundary(right_slice, apex, angle_width, \
peak_item.left = apex_index - left_slice.size
peak_item.right = apex_index + right_slice.size
def _trim_boundary(ia_slice, apex, angle_width, angle_threshold):
@summary: Performs peak boundary trimming
@param ia_slice: An numpy object. Ion chromatogram intensity
array slice representing initial peak boundary
@type ia_slice: numpy.ndarray
@param apex: A number. Intensity at peak apex
@type apex: FloatType
@param angle_width: An integer. The number of point for the best fit
@type angle_width: IntType
@param angle_threshold: A number. The angle threshold for peak boundary
points. A line of best fit drawn through an arbitrary number of
points at the peak boundaries must form an angle greater than
the angle threshold. If not, the boundary point will be trimmed
@type angle_threshold: FloatType
@return: A numpy object. Trimmed peak boundary
@rtype: FloatType
if ia_slice.size <= angle_width: return ia_slice
index = 0
while index + angle_width < ia_slice.size:
x1 = index
x2 = index + angle_width
x_values = numpy.arange(x1, x2+1)
y_values = ia_slice[x_values]
gradient = _best_fit_line(y_values, x_values)[0]
rise = angle_width * gradient
rise = rise / apex
run = angle_width
angle = math.atan(rise/run)
angle = math.degrees(angle)
index = index + 1
if angle > angle_threshold: break
return ia_slice[index-1:]
def _best_fit_line(y_values, x_values=None):
@summary: Calculates the coefficients to a linear line of best fit
@param y_values: A numpy object representing the y-values
@type y_values: numpy.ndarray
@param x_values: A numpy object representing the x-values
@type x_values: numpy.ndarray
@return: A tuple. Best fit coefficients where the first element is
the gradient, and the second is the y-intercept
@rtype: numpy.float64, numpy.float64
if x_values == None:
x_values = numpy.arange(y_values.size)
x_array = numpy.repeat([1], x_values.size)
x_array = numpy.array([x_values, x_array])
x_array = numpy.transpose(x_array)
y_array = copy.deepcopy(y_values)
alpha =, x_array)
beta =, y_array)
alphaM = numpy.asmatrix(alpha)
ialphaA = numpy.asarray(alphaM.I)
coefficients =, beta)
return coefficients[0], coefficients[1]
def peak_area(ia, peaks,da=1):
@summary: Generates a list of peak objects from the list of pre-peaks,
and integrates peak areas
@param ia: A Numpy Array
@type ia: numpy.ndarray
@param peak_list: A list of pre-peaks
@type peak_list: ListType
@param da: area scale
@type peak_list: numpy.float64
@return: A list of peak objects
@rtype: ListType
# ia = ia.get_intensity_array()
for peak in peaks:
pt_left = peak.left
pt_apex = peak.index
pt_right = peak.right
# peak areas is the sum of intensity from peak left
# to its right boundary (boundary points included)
area_slice = ia[pt_left:pt_right+1]
max_slice = numpy.max(ia[pt_left:pt_right+1])
peak.area = area_slice.sum()*da
return True
def peak_fwhm(ia, peaks,da=1):
@summary: Generates a list of peak objects from the list of pre-peaks,
and integrates peak areas
@param ia: A Numpy Array
@type ia: numpy.ndarray
@param peak_list: A list of pre-peaks
@type peak_list: ListType
@param da: area scale
@type peak_list: numpy.float64
@return: A list of peak objects
@rtype: ListType
# ia = ia.get_intensity_array()
for peak in peaks:
pt_left = peak.left
pt_apex = peak.index
pt_right = peak.right
# peak areas is the sum of intensity from peak left
# to its right boundary (boundary points included)
right_slice = ia[pt_apex:pt_right+1]
for i in xrange(len(right_slice)):
if right_slice[i] < float(searcher)/2.:
return True
def max_peaks(ia, peaks,da=1):
for peak in peaks:
if peak.value > max_value: max_value=peak.value
for peak in peaks:
if peak.value==max_value: max_peaks.append(peak)
return max_peaks
