Skip to content

Instantly share code, notes, and snippets.

@etscrivner
Created March 23, 2016 21:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save etscrivner/edd81477ffc1f3b6180b to your computer and use it in GitHub Desktop.
Save etscrivner/edd81477ffc1f3b6180b to your computer and use it in GitHub Desktop.
import csv
import math
from lib import statistics
from lib import integration
def get_data(csv_file):
with open(csv_file, 'r') as table:
reader = csv.DictReader(table)
return [each for each in reader]
class SegmentRange(object):
def __init__(self, lower=None, upper=None):
self.lower = lower
self.upper = upper
def __hash__(self):
return hash((self.lower, self.upper))
def __repr__(self):
lower_bound = self.lower if self.lower else '-inf'
upper_bound = self.upper if self.upper else '+inf'
return 'Range[{}, {}]'.format(lower_bound, upper_bound)
def in_range(self, value):
return ((not self.lower or value >= self.lower) and
(not self.upper or value <= self.upper))
def build_buckets(num_segments):
segment_probability = 1.0 / num_segments
integrator = integration.Integrator(20, 1E-10)
method = lambda x: integrator.integrate_minus_infinity_to(statistics.normal_distribution, x)
previous_upper = None
results = []
for i in range(1, int(num_segments)):
upper = integration.approximate_inverse(method, i*segment_probability)
results.append(SegmentRange(previous_upper, upper))
previous_upper = upper
results.append(SegmentRange(previous_upper, None))
return results
def spike():
data = get_data('./fixtures/8A/table_d14.csv')
object_loc = [float(each['LOC/Method']) for each in data]
average = statistics.mean(object_loc)
standard_deviation = statistics.standard_deviation(object_loc)
normalized_object_loc = [
(each - average)/standard_deviation for each in object_loc]
number_of_segments = 5.0 * math.ceil(math.sqrt(len(data)) / 5.0)
results = build_buckets(number_of_segments)
buckets = {each: 0 for each in results}
for item in normalized_object_loc:
for rng in buckets.keys():
if rng.in_range(item):
buckets[rng] += 1
items_per_bucket = len(normalized_object_loc) / number_of_segments
print items_per_bucket
chi_squared = sum([
(items_per_bucket - items_in_bucket)**2 / items_per_bucket
for items_in_bucket in buckets.values()
])
print average
print standard_deviation
print 'Q: ', chi_squared
print buckets
integrator = integration.Integrator(20, 1E-10)
method = statistics.make_t_distribution(number_of_segments - 1)
p = integrator.integrate_minus_infinity_to(method, chi_squared)
print 'p: ', p
print 1-p
if __name__ == '__main__':
spike()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment