Created
July 24, 2015 22:17
-
-
Save codyrioux/925cb18b5fd273b8d122 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Drift Generator | |
# | |
class DriftingDistributionGenerator(Generator): | |
""" | |
A generator which takes two distinct distributions and a changepoint and returns | |
random variates from the first distribution until it has reached the changepoint | |
where it linearly drifts to the second distribution. | |
dist1: A scipy.stats distribution for before changepoint. | |
kwargs1: A map specifying loc and scale for dist1. | |
dist2: A scipy.stats distribution for after changepoint. | |
kwargs2: A map specifying loc and scale for dist2. | |
changepoint: The number of values to be generated before switching to dist2. | |
steps: The number of steps to take in the drift. | |
""" | |
_position = 0 | |
def __init__(self, dist1, kwargs1, dist2, kwargs2, changepoint, steps): | |
self._dist1 = dist1 | |
self._kwargs1 = kwargs1 | |
self._dist2 = dist2 | |
self._kwargs2 = kwargs2 | |
self._changepoint = changepoint | |
self._steps = steps | |
self._change_gradient = np.linspace(0, 1, self._steps) | |
def get(self): | |
self._position += 1 | |
if self._position <= self._changepoint: | |
return self._dist1.rvs(**self._kwargs1) | |
elif self._position > self._changepoint and self._position <= self._changepoint + self._steps: | |
beta = self._change_gradient[self._position - self._changepoint - 1] | |
return ((1.0 - beta) * self._dist1.rvs(**self._kwargs1)) + (beta * self._dist2.rvs(**self._kwargs2)) | |
else: | |
return self._dist2.rvs(**self._kwargs2) | |
# | |
# Thresh Detector | |
# | |
class ThreshDetector(object): | |
def __init__(self, threshold=0.2, window_length=10, min_training=50): | |
self._window = deque(maxlen=window_length) | |
self._threshold = threshold | |
self._triggered = False | |
self.changepoint = 0 | |
self._min_training = min_training | |
self._sum = 0 | |
self._sumsq = 0 | |
self._N = 0 | |
def step(self, datum): | |
self._window.append(datum) | |
# Welford's method | |
self._N += 1 | |
self._sum += datum | |
self._sumsq += datum ** 2 | |
self._mu = self._sum / self._N | |
if self._N > self._min_training: | |
variance = (self._sumsq - self._N * self._mu ** 2) / (self._N - 1) | |
self._std = math.sqrt(variance) | |
window_mu = sum(self._window) / len(self._window) | |
ratio = window_mu / self._mu # TODO: Will fail if mu is zero. | |
if ratio > (1.0 + self._threshold) or ratio < (1.0 - self._threshold): | |
self._triggered = True | |
self.changepoint = self._N | |
return self._triggered | |
generator = DataBackedGenerator(vec, changepoint) | |
detector = ThreshDetector(2.5, 60) # Known good parameters. | |
simulator = Simulator([generator], detector) | |
simulator.run(plot=True) | |
# | |
# Monte Carlo Detector | |
# | |
class WindowedMonteCarloDetector(ChangeDetector): | |
def __init__(self, len1, len2, samples=1000, confidence=0.95, min_samples=50): | |
self._window1 = deque(maxlen=len1) | |
self._window2 = deque(maxlen=len2) | |
self._confidence = confidence | |
self._min_samples = min_samples | |
self._samples = samples | |
self._N = 0 | |
self._triggered = False | |
self.changepoint = 0 | |
def step(self, datum): | |
self._window1.append(datum) | |
self._window2.append(datum) | |
self._N += 1 | |
if self._N >= self._min_samples: | |
diffs = np.zeros(self._samples) | |
for i in xrange(self._samples): | |
diffs[i] = random.choice(self._window1) - random.choice(self._window2) | |
hdi_min, hdi_max = hdi(diffs, self._confidence) | |
self._triggered = not between(0.0, hdi_min, hdi_max) | |
if self._triggered: | |
self.changepoint = self._N | |
return self._triggered | |
generator = DataBackedGenerator(vec, changepoint) | |
detector = WindowedMonteCarloDetector(100, 10, 1000, 0.25) | |
simulator = Simulator([generator], detector) | |
simulator.run(plot=True) | |
# | |
# Util | |
# | |
def calc_min_interval(x, alpha): | |
"""Internal method to determine the minimum interval of | |
a given width""" | |
# Initialize interval | |
min_int = [None,None] | |
try: | |
# Number of elements in trace | |
n = len(x) | |
# Start at far left | |
start, end = 0, int(n*(1-alpha)) | |
# Initialize minimum width to large value | |
min_width = np.inf | |
while end < n: | |
# Endpoints of interval | |
hi, lo = x[end], x[start] | |
# Width of interval | |
width = hi - lo | |
# Check to see if width is narrower than minimum | |
if width < min_width: | |
min_width = width | |
min_int = [lo, hi] | |
# Increment endpoints | |
start +=1 | |
end += 1 | |
return min_int | |
except IndexError: | |
print 'Too few elements for interval calculation' | |
return [None,None] | |
def hdi(trace, cred_mass=0.95): | |
hdi_min, hdi_max = calc_min_interval(np.sort(trace), 1.0-cred_mass) | |
return hdi_min, hdi_max |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment