Skip to content

Instantly share code, notes, and snippets.

@codyrioux
Created July 24, 2015 22:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codyrioux/925cb18b5fd273b8d122 to your computer and use it in GitHub Desktop.
Save codyrioux/925cb18b5fd273b8d122 to your computer and use it in GitHub Desktop.
#
# Drift Generator
#
class DriftingDistributionGenerator(Generator):
"""
A generator which takes two distinct distributions and a changepoint and returns
random variates from the first distribution until it has reached the changepoint
where it linearly drifts to the second distribution.
dist1: A scipy.stats distribution for before changepoint.
kwargs1: A map specifying loc and scale for dist1.
dist2: A scipy.stats distribution for after changepoint.
kwargs2: A map specifying loc and scale for dist2.
changepoint: The number of values to be generated before switching to dist2.
steps: The number of steps to take in the drift.
"""
_position = 0
def __init__(self, dist1, kwargs1, dist2, kwargs2, changepoint, steps):
self._dist1 = dist1
self._kwargs1 = kwargs1
self._dist2 = dist2
self._kwargs2 = kwargs2
self._changepoint = changepoint
self._steps = steps
self._change_gradient = np.linspace(0, 1, self._steps)
def get(self):
self._position += 1
if self._position <= self._changepoint:
return self._dist1.rvs(**self._kwargs1)
elif self._position > self._changepoint and self._position <= self._changepoint + self._steps:
beta = self._change_gradient[self._position - self._changepoint - 1]
return ((1.0 - beta) * self._dist1.rvs(**self._kwargs1)) + (beta * self._dist2.rvs(**self._kwargs2))
else:
return self._dist2.rvs(**self._kwargs2)
#
# Thresh Detector
#
class ThreshDetector(object):
def __init__(self, threshold=0.2, window_length=10, min_training=50):
self._window = deque(maxlen=window_length)
self._threshold = threshold
self._triggered = False
self.changepoint = 0
self._min_training = min_training
self._sum = 0
self._sumsq = 0
self._N = 0
def step(self, datum):
self._window.append(datum)
# Welford's method
self._N += 1
self._sum += datum
self._sumsq += datum ** 2
self._mu = self._sum / self._N
if self._N > self._min_training:
variance = (self._sumsq - self._N * self._mu ** 2) / (self._N - 1)
self._std = math.sqrt(variance)
window_mu = sum(self._window) / len(self._window)
ratio = window_mu / self._mu # TODO: Will fail if mu is zero.
if ratio > (1.0 + self._threshold) or ratio < (1.0 - self._threshold):
self._triggered = True
self.changepoint = self._N
return self._triggered
generator = DataBackedGenerator(vec, changepoint)
detector = ThreshDetector(2.5, 60) # Known good parameters.
simulator = Simulator([generator], detector)
simulator.run(plot=True)
#
# Monte Carlo Detector
#
class WindowedMonteCarloDetector(ChangeDetector):
def __init__(self, len1, len2, samples=1000, confidence=0.95, min_samples=50):
self._window1 = deque(maxlen=len1)
self._window2 = deque(maxlen=len2)
self._confidence = confidence
self._min_samples = min_samples
self._samples = samples
self._N = 0
self._triggered = False
self.changepoint = 0
def step(self, datum):
self._window1.append(datum)
self._window2.append(datum)
self._N += 1
if self._N >= self._min_samples:
diffs = np.zeros(self._samples)
for i in xrange(self._samples):
diffs[i] = random.choice(self._window1) - random.choice(self._window2)
hdi_min, hdi_max = hdi(diffs, self._confidence)
self._triggered = not between(0.0, hdi_min, hdi_max)
if self._triggered:
self.changepoint = self._N
return self._triggered
generator = DataBackedGenerator(vec, changepoint)
detector = WindowedMonteCarloDetector(100, 10, 1000, 0.25)
simulator = Simulator([generator], detector)
simulator.run(plot=True)
#
# Util
#
def calc_min_interval(x, alpha):
"""Internal method to determine the minimum interval of
a given width"""
# Initialize interval
min_int = [None,None]
try:
# Number of elements in trace
n = len(x)
# Start at far left
start, end = 0, int(n*(1-alpha))
# Initialize minimum width to large value
min_width = np.inf
while end < n:
# Endpoints of interval
hi, lo = x[end], x[start]
# Width of interval
width = hi - lo
# Check to see if width is narrower than minimum
if width < min_width:
min_width = width
min_int = [lo, hi]
# Increment endpoints
start +=1
end += 1
return min_int
except IndexError:
print 'Too few elements for interval calculation'
return [None,None]
def hdi(trace, cred_mass=0.95):
hdi_min, hdi_max = calc_min_interval(np.sort(trace), 1.0-cred_mass)
return hdi_min, hdi_max
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment