Skip to content

Instantly share code, notes, and snippets.

@phobson
Created April 11, 2012 19:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save phobson/2361814 to your computer and use it in GitHub Desktop.
Save phobson/2361814 to your computer and use it in GitHub Desktop.
Env. eng. numpy.ma example
from __future__ import division
import numpy as np
import matplotlib.pyplot as plt
import scipy.stats as stats
#import DataAccess as db
import pdb
def getTestData():
'''
generates test data
'''
raw_data = np.array([
2. , 4.2 , 4.62, 5. , 5. , 5.5 , 5.57, 5.66,
5.75, 5.86, 6.65, 6.78, 6.79, 7.5 , 7.5 , 7.5 ,
8.63, 8.71, 8.99, 9.50, 9.50, 9.85, 10.82, 11.00,
11.25, 11.25, 12.2 , 14.92, 16.77, 17.81, 19.16, 19.19,
19.64, 20.18, 22.97])
raw_mask = np.array([
0 , 0 , 0 , 1 , 1 , 1 , 0 , 0,
1 , 0 , 0 , 0 , 0 , 0 , 0 , 0,
0 , 0 , 0 , 1 , 1 , 0 , 0 , 1,
0 , 0 , 0 , 0 , 0 , 0 , 0 , 0,
0 , 0 , 0])
return np.ma.MaskedArray(data=raw_data, mask=raw_mask)
def rosSort(raw_vals, raw_mask):
'''
prep data for ROS. Sort ascending with non-detects (mask=True)
on top. something like this:
[2, 4, 4, 10, 3, 5, 6, 10, 12, 40, 78, 120]
with [2, 4, 4, 10] being the ND reults (masked the output).
data and mask must be in sync for this not to spit out complete garbage
'''
ndx_d = raw_mask==False
ndx_nd = raw_mask==True
sorted_data = np.hstack([raw_vals[ndx_nd], raw_vals[ndx_d]])
sorted_mask = np.hstack([raw_mask[ndx_nd], raw_mask[ndx_d]])
ros_data = np.ma.MaskedArray(data=sorted_data, mask=sorted_mask)
return ros_data
class MR:
'''
ROS = ranked-order statistics
This class implements the MR method outlined Hirsch and Stedinger, 1987) to estimate
the censored (non-detect) values of a dataset.
Usage:
>>> import ros
>>> myData = ros.MR(<db_table>, <analyte>, testing=False)
Setting the 'testing' parameter to True will provide you with a hard-
coded dataset stored in the ros.getData function. Otherwise, the MR class
will attempt to connect to the International Stormwater BMP database
stored in the present working directory and pull all of the <anaylte>'s
data from the <db_table>.
Creating an MR object will go ahead and estimate the final data values. Continuing
the example above, that data would be accessed via the 'final_data' attribute of the
ros.MR object:
>>> import ros
>>> myData = ros.MR(<db_table>, <analyte>, testing=False)
>>> print(myData.final_data)
The other public attributes of the ros.MR object are as follows:
ros.MR.raw_vals - The raw data pulled from the databse. Detection limits are used
with 'U' or 'UJ' flagged data.
ros.MR.mask - Array of boolean values indicating if a ros.MR.raw_val is censored
ros.MR.data - A numpy MaskedArray object with the censored data masked.
ros.MR.DLs - Array of the uniqie detection limits of the dataset. If a
non-censored value exists that is lower than the minimum detection
limit, it is appended on to this array.
ros.MR.DLIndex - Array of indices for each ros.MR.raw_val that points to the
corresponding detection limit.
ros.MR.nrakks - Array of unaveraged ranks for the data
ros.MR.aranks - Array of averaged (final) ranks for the data
ros.MR.final_data - Array of data where the censored values have been replaced
with estimation.
Called ros.MR.plot() will produce a graph comparing the estimated data with raw data
containing detection limit substituions. The figure will be saved as
<table>_<parameter>.png
'''
def __init__(self, data, testing=False):
assert type(data)==np.ma.core.MaskedArray, 'Data must be a masked array'
self.test = testing
self.raw_vals = data.data
self.raw_mask = data.mask
self.data = rosSort(self.raw_vals, self.raw_mask)
self.DLs = np.unique(self.data.data[self.data.mask])
if self.DLs.shape[0] > 0:
if self.data.min() < self.DLs.min():
self.DLs = np.hstack([self.data.min(), self.DLs])
else:
self.DLs = np.array([])
self.DLIndex = self.__rosDLIndex()
self.nranks, self.aranks = self.__rosRanks()
self.plot_pos, self.final_data, self.Z = self.__rosEstimator()
def __rosRanks(self):
'''
Determine the whacky ranks of the data according to the following logic
rank[n] = rank[n-1] + 1 when:
n is 0 OR
n > 0 and d[n].masked is True and j[n] <> d[n-1] OR
n > 0 and d[n].masked is False and d[n-1].masked is True OR
n > 0 and d[n].masked is False and d[n-1].masked is False and j[n] <> j[n-1]
rank[n] = 1
n > 0 and d[n].masked is True and j[n] == d[n-1] OR
n > 0 and d[n].masked is False and d[n-1].masked is False and j[n] == j[n-1]
where j[n] is the index of the highest DL that is less than the current data value
Then the ranks of non-censored equivalent data values are averaged.
'''
norm_ranks = np.ones(len(self.data), dtype='f2')
for n in range(len(self.data.data)):
if n == 0 \
or self.DLIndex[n] <> self.DLIndex[n-1] \
or self.data.mask[n] <> self.data.mask[n-1]:
norm_ranks[n] = 1
else: #self.DLIndex[n-1] == self.DLIndex[n]:
norm_ranks[n] = norm_ranks[n-1] + 1
avgd_ranks = norm_ranks.copy()
for n in range(len(norm_ranks)):
if not self.data.mask[n]:
index = np.bitwise_and(self.DLIndex==self.DLIndex[n],
self.data==self.data[n])
avgd_ranks[index] = norm_ranks[index].mean()
return norm_ranks, avgd_ranks
def __rosDLIndex(self):
'''
creates an array of indices for the detection limits (self.DLs)
corresponding to each data point
'''
if self.DLs.shape[0] > 0:
j = []
for n in range(len(self.data.data)):
value = self.data.data[n]
censored = self.data.mask[n]
index, = np.where(self.DLs <= value)
j.append(index[-1])
j = np.array(j)
else:
j = np.zeros(len(self.data.data))
return j
def __computeABC(self, det_lims):
'''
Intermediate values needed to compute plotting positions.
TODO: get better terminolgy for these
'''
lowerDL = np.min(det_lims)
upperDL = np.max(det_lims)
A_above = self.data[self.data >= lowerDL]
A_above_and_below = A_above[A_above < upperDL]
A_uncensored = A_above_and_below[A_above_and_below.mask==False]
A = A_uncensored.shape[0]
B_LT = self.data[self.data.data < lowerDL]
B_LTE = self.data[self.data.data <= lowerDL]
B = B_LTE[B_LTE.mask==True].shape[0] + B_LT[B_LT.mask==False].shape[0]
#pdb.set_trace()
censored_below = self.data.data[self.data.mask] == lowerDL
C = censored_below.sum()
return float(A), float(B), float(C)
def __rosEstimator(self):
'''
Estimates the values of the censored data
'''
N_tot = self.data.shape[0]
N_nd = self.data.mask.sum()
plot_pos = np.zeros(N_tot)
if N_nd == 0:
modeled_data = np.array([])
Z = np.array([])
fit = []
elif N_tot - N_nd < 2 or N_nd/N_tot > 0.8:
modeled_data = self.data.data[self.data.mask] * 0.5
Z = np.array([])
fit = []
else:
num_DLs = self.DLs.shape[0]
PE = np.zeros(num_DLs+1)
A = np.zeros(num_DLs)
B = np.zeros(num_DLs)
C = np.zeros(num_DLs)
for j in range(num_DLs-1, -1, -1):
#define A and B
try:
det_lims = (self.DLs[j], self.DLs[j+1])
except IndexError:
det_lims = (self.DLs[j], np.inf)
A[j], B[j], C[j] = self.__computeABC(det_lims)
PE[j] = PE[j+1] + A[j]/(A[j]+B[j]) * (1-PE[j+1])
for n in range(N_tot):
j = self.DLIndex[n]
if self.data.mask[n]:
plot_pos[n] = (1-PE[j]) * self.aranks[n]/(C[j]+1)
else:
plot_pos[n] = (1-PE[j]) + (PE[j] - PE[j+1]) * self.aranks[n] / (A[j]+1)
Zprelim = np.ma.MaskedArray(data=stats.distributions.norm.ppf(plot_pos), mask=self.data.mask)
fit = stats.mstats.linregress(Zprelim, np.log(self.data))
slope, intercept = fit[:2]
modeled_data = np.exp(slope*Zprelim.data[Zprelim.mask] + intercept)
full_data = np.hstack([modeled_data, self.data[self.data.mask==False]])
Zfinal, final_data = stats.probplot(full_data.data, fit=0)
return plot_pos, final_data, Zfinal #, fit, (A, B, C, PE)
def plot(self, filename):
'''
makes a simple plot showing the original and modeled data
'''
fig, ax1 = plt.subplots()
ax1.plot(self.Z[self.data.mask==False], self.data[self.data.mask==False],
'ko', mfc='Maroon', ms=6, label='original detects', zorder=8)
ax1.plot(self.Z[self.data.mask==True], self.data.data[self.data.mask],
'ko', ms=6, label='original non-detects', zorder=8, mfc='none')
ax1.plot(self.Z, self.final_data, 'ks', ms=4, zorder=10,
label='modeled data', mfc='DodgerBlue')
ax1.set_xlabel(r'$Z$-score')
ax1.set_ylabel('concentration')
ax1.set_yscale('log')
ax1.legend(loc='upper left', numpoints=1)
ax1.xaxis.grid(True, which='major', ls='-', lw=0.5, alpha=0.35)
ax1.yaxis.grid(True, which='major', ls='-', lw=0.5, alpha=0.35)
ax1.yaxis.grid(True, which='minor', ls='-', lw=0.5, alpha=0.17)
plt.tight_layout()
fig.savefig(filename)
plt.close(fig)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment