Create a gist now

Instantly share code, notes, and snippets.

@yanhan /Cdf.py Secret
Last active Sep 19, 2015

What would you like to do?
Code for Exercise 5.13 Part 1 of Think Stats
"""This file contains code for use with "Think Stats",
by Allen B. Downey, available from greenteapress.com
Copyright 2008 Allen B. Downey.
Distributed under the GNU General Public License at gnu.org/licenses/gpl.html.
"""
"""Functions for building CDFs (cumulative distribution functions)."""
import bisect
import math
import random
import Pmf
class Cdf(object):
"""Represents a cumulative distribution function.
Attributes:
xs: sequence of values
ps: sequence of probabilities
name: string used as a graph label.
"""
def __init__(self, xs=None, ps=None, name=''):
self.xs = [] if xs is None else xs
self.ps = [] if ps is None else ps
self.name = name
def Values(self):
"""Returns a sorted list of values.
"""
return self.xs
def Items(self):
"""Returns a sorted sequence of (value, probability) pairs.
Note: in Python3, returns an iterator.
"""
return zip(self.xs, self.ps)
def Append(self, x, p):
"""Add an (x, p) pair to the end of this CDF.
Note: this us normally used to build a CDF from scratch, not
to modify existing CDFs. It is up to the caller to make sure
that the result is a legal CDF.
"""
self.xs.append(x)
self.ps.append(p)
def Prob(self, x):
"""Returns CDF(x), the probability that corresponds to value x.
Args:
x: number
Returns:
float probability
"""
if x < self.xs[0]: return 0.0
index = bisect.bisect(self.xs, x)
p = self.ps[index-1]
return p
def Value(self, p):
"""Returns InverseCDF(p), the value that corresponds to probability p.
Args:
p: number in the range [0, 1]
Returns:
number value
"""
if p < 0 or p > 1:
raise ValueError('Probability p must be in range [0, 1]')
if p == 0: return self.xs[0]
if p == 1: return self.xs[-1]
index = bisect.bisect(self.ps, p)
if p == self.ps[index-1]:
return self.xs[index-1]
else:
return self.xs[index]
def Percentile(self, p):
"""Returns the value that corresponds to percentile p.
Args:
p: number in the range [0, 100]
Returns:
number value
"""
return self.Value(p / 100.0)
def Random(self):
"""Chooses a random value from this distribution."""
return self.Value(random.random())
def Sample(self, n):
"""Generates a random sample from this distribution.
Args:
n: int length of the sample
"""
return [self.Random() for i in range(n)]
def Mean(self):
"""Computes the mean of a CDF.
Returns:
float mean
"""
old_p = 0
total = 0.0
for x, new_p in zip(self.xs, self.ps):
p = new_p - old_p
total += p * x
old_p = new_p
return total
def _Round(self, multiplier=1000.0):
"""
An entry is added to the cdf only if the percentile differs
from the previous value in a significant digit, where the number
of significant digits is determined by multiplier. The
default is 1000, which keeps log10(1000) = 3 significant digits.
"""
# TODO(write this method)
pass
def Render(self):
"""Generates a sequence of points suitable for plotting.
An empirical CDF is a step function; linear interpolation
can be misleading.
Returns:
tuple of (xs, ps)
"""
xs = [self.xs[0]]
ps = [0.0]
for i, p in enumerate(self.ps):
xs.append(self.xs[i])
ps.append(p)
try:
xs.append(self.xs[i+1])
ps.append(p)
except IndexError:
pass
return xs, ps
def MakeCdfFromItems(items, name=''):
"""Makes a cdf from an unsorted sequence of (value, frequency) pairs.
Args:
items: unsorted sequence of (value, frequency) pairs
name: string name for this CDF
Returns:
cdf: list of (value, fraction) pairs
"""
runsum = 0
xs = []
cs = []
for value, count in sorted(items):
runsum += count
xs.append(value)
cs.append(runsum)
total = float(runsum)
ps = [c/total for c in cs]
cdf = Cdf(xs, ps, name)
return cdf
def MakeCdfFromDict(d, name=''):
"""Makes a CDF from a dictionary that maps values to frequencies.
Args:
d: dictionary that maps values to frequencies.
name: string name for the data.
Returns:
Cdf object
"""
return MakeCdfFromItems(d.iteritems(), name)
def MakeCdfFromHist(hist, name=''):
"""Makes a CDF from a Hist object.
Args:
hist: Pmf.Hist object
name: string name for the data.
Returns:
Cdf object
"""
return MakeCdfFromItems(hist.Items(), name)
def MakeCdfFromPmf(pmf, name=None):
"""Makes a CDF from a Pmf object.
Args:
pmf: Pmf.Pmf object
name: string name for the data.
Returns:
Cdf object
"""
if name == None:
name = pmf.name
return MakeCdfFromItems(pmf.Items(), name)
def MakeCdfFromList(seq, name=''):
"""Creates a CDF from an unsorted sequence.
Args:
seq: unsorted sequence of sortable values
name: string name for the cdf
Returns:
Cdf object
"""
hist = Pmf.MakeHistFromList(seq)
return MakeCdfFromHist(hist, name)
# coding=utf-8
# Copyright 2015 Pang Yan Han, Philip
# License: GNU GPLv3 http://www.gnu.org/licenses/gpl.html
from __future__ import print_function
from math import exp, log
from thinkstats import Mean
import random
import scipy.optimize
import Cdf
import myplot
def ex5_13():
# We'll do the simulation 1 year by 1 year
# Each year, about 1 / 1000 people will contract the particular cancer.
# So we assume that for each year, P(person gets cancer) = 1 / 1000
prob_get_cancer = 1.0 / 1000
# Performs a simulation and returns the simulated data
def _perform_simulation(nr_cohorts):
cohort_size = 100
nr_years = 10
simulated_data = []
# we perform simulation one cohort at a time
for _ in range(nr_cohorts):
cancer_cases_in_cohort = 0
# and for each cohort, we perform the simulation 1 year at a
# time for `nr_years` years
for _ in range(nr_years):
nr_contracted_cancer_in_this_round = 0
# Once a patient contracts cancer, he/she is counted as a
# case. Therefore we don't need to perform simulation for
# that person anymore. The `nr_in_cohort_without_cancer`
# stores the number of people in the cohort who have not yet
# contracted cancer (we sound evil here but I don't know how
# else to put it)
nr_in_cohort_without_cancer = cohort_size - cancer_cases_in_cohort
for i in range(nr_in_cohort_without_cancer):
# this simulates a Ber(p) random variable, where p is
# the probability of the person contracting cancer
if random.uniform(0, 1) <= prob_get_cancer:
nr_contracted_cancer_in_this_round += 1
cancer_cases_in_cohort += nr_contracted_cancer_in_this_round
# we're done simulating the cohort and have the total number of
# cancer cases in the cohort over 10 years. append it to our
# data set.
simulated_data.append(cancer_cases_in_cohort)
return simulated_data
# number of cohorts we are simulating
# NOTE: Change this to a number you like.
nr_cohorts = 1000
simulated_data = _perform_simulation(nr_cohorts)
empirical_cdf = Cdf.MakeCdfFromList(
simulated_data,
name="cancer cases"
)
myplot.Cdf(empirical_cdf)
myplot.Save(
root="ex5-13-p1-cdf",
formats=["pdf", "png",],
xlabel="cancer cases",
ylabel="CDF(x)",
title="CDF plot",
)
# Ok, let's try to see if an exponential distribution will be a
# good model for our data.
scale = myplot.Cdf(empirical_cdf, transform="exponential")
myplot.Save(
root="ex5-13-p1-ccdf-logy-scale",
formats=["pdf", "png",],
xlabel="cancer cases",
ylabel="CCDF(x) (log scale)",
title="CCDF plot",
**scale
)
# Ok, let's see if the Pareto will be a good model for our data.
scale = myplot.Cdf(empirical_cdf, transform="pareto")
myplot.Save(
root="ex5-13-p1-ccdf-log-log-scale",
formats=["pdf", "png",],
xlabel="cancer cases (log scale)",
ylabel="CCDF(x) (log scale)",
title="CCDF plot (log log scale)",
**scale
)
# Let's do a CDF plot with log x scale to see if the resulting plot
# looks similar to that of a normal CDF. If yes, then the Log-normal
# distribution will be a good model for the data.
scale = myplot.Cdf(empirical_cdf, xscale="log")
myplot.Save(
root="ex5-13-p1-cdf-logx-scale",
formats=["pdf", "png",],
xlabel="cancer cases (log scale)",
ylabel="CDF",
title="CDF plot (log x scale)",
**scale
)
# Let's see if the Weibull distribution will be a good model for our
# data.
scale = myplot.Cdf(empirical_cdf, transform="weibull")
myplot.Save(
root="ex5-13-p1-one-over-log-ccdf-log-log-scale",
formats=["pdf", "png",],
xlabel="cancer cases (log scale)",
ylabel="log(1 / CCDF(x)) (log scale)",
title="log(1 / CCDF) plot (log log scale)",
**scale
)
# K, seems like the Exponential and Weibull distributions are the
# best models for our data.
#
# We use the Method of Moments and MLE and obtained the exact same
# parameter for lambda, which is 1 / Xbar .
#
# So we shall use an Exponential distribution with
# lambda = 1 / Xbar as our model and plot it with our data and see
# if it's a good fit.
# First, we compute lambda = 1 / Xbar
xBar = Mean(simulated_data)
exp_lambda = 1.0 / xBar
# Here, we plot the Exp(lambda) CDF, over increments of 0.01 to make
# it more smooth when we plot it. Otherwise, we'll be getting a plot
# which looks similar to the empirical CDF, which is only defined at
# discrete points.
max_val = max(simulated_data)
exp_lambda_pdf = lambda x: exp_lambda * exp(-exp_lambda * x)
val_to_density = {}
i, incr = 0, 0.01
while i <= max_val:
val_to_density[i] = exp_lambda_pdf(i)
i += incr
exp_model_cdf = Cdf.MakeCdfFromDict(val_to_density,
name="model"
)
myplot.Cdfs([empirical_cdf, exp_model_cdf], transform="exponential")
# Plot with log y scale of:
# 1. Empirical CCDF
# 2. CCDF of Exp(lambda) model
scale = dict(yscale="log")
myplot.Save(
root="ex5-13-p1-ccdf-with-ccdf-of-exp-model-logy-scale",
formats=["pdf", "png",],
xlabel="cancer cases",
ylabel="CCDF(x) (log scale)",
title="CCDF & CCDF of Exp({}) model (log y scale)".format(
exp_lambda
),
**scale
)
# Plot with:
# 1. Empirical CDF
# 2. Exp(lambda) model
myplot.Cdfs([empirical_cdf, exp_model_cdf])
myplot.Save(
root="ex5-13-p1-cdf-with-exp-model",
formats=["pdf", "png",],
xlabel="cancer cases",
ylabel="CDF",
title="CDF & Exp({}) model".format(exp_lambda),
)
# Perform another simulation to get test data, we'll be needing this
# later when we validate our models
test_data = _perform_simulation(nr_cohorts)
test_data_cdf = Cdf.MakeCdfFromList(test_data, name="test data")
# Exp model shifted to the left by `exp_model_shift`, where
# `exp_model_shift` is defined as:
#
# P(Exp(lambda) <= exp_model_shift)
# = probability of cohorts having 0 cancer cases
exp_model_shift = exp_model_cdf.Value(empirical_cdf.Prob(0))
# Redo the Exp(lambda) CDF. We start off by computing the density
# function from `i = exp_model_shift` instead of 0, until `i`
# reaches `max_val + exp_model_shift`.
# However, those densities will be attributed to
# `i - exp_model_shift` instead of to `i` because we want to shift
# the CDF by `exp_model_shift` units to the left on the x-axis.
i, incr, stop = exp_model_shift, 0.01, max_val + exp_model_shift
val_to_density = {}
while i <= stop:
val_to_density[i - exp_model_shift] = exp_lambda_pdf(i)
i += incr
# For CDF(0) of our Exponential model to start at
# P(cohorts with <= 0 cancer cases), we need to sum all the
# densities from 0 to `exp_model_shift` and add them to the density
# at `exp_model_shift`. We do so in the
# `sum_of_densities_from_zero_to_exp_model_shift` variable.
sum_of_densities_from_zero_to_exp_model_shift = 0
i = 0
while i < exp_model_shift:
sum_of_densities_from_zero_to_exp_model_shift += \
exp_lambda_pdf(i)
i += incr
# then we add it to density at the `exp_model_shift` value which is
# indexed by 0 in the `val_to_density` dict
val_to_density[0] = val_to_density[0] + \
sum_of_densities_from_zero_to_exp_model_shift
shifted_exp_model_cdf = Cdf.MakeCdfFromDict(val_to_density,
name="model"
)
myplot.Cdfs([empirical_cdf, shifted_exp_model_cdf])
myplot.Save(
root="ex5-13-p1-cdf-with-shifted-exp-model",
formats=["pdf", "png",],
xlabel="cancer cases",
ylabel="CDF",
title="CDF with Exp({}) model shifted by {} to the left".format(
exp_lambda, exp_model_shift
),
)
# A good chunk of our simulated data consists of zeros.
# For parameter estimation using the Maximum Likelihood Estimate, we
# need to perform some calculations involving log on the data.
# However, we know that log(0) is undefined. So we remove all zeros
# from the data set.
X_without_zero = list(filter(lambda x: x > 0, simulated_data))
X_without_zero_len = len(X_without_zero)
log_X_without_zero = [log(x) for x in X_without_zero]
one_over_n_sum_of_log_X_without_zero = \
sum(log_X_without_zero) / float(X_without_zero_len)
# equation we're trying to find roots
def _eqn_for_k(k):
numer = 0.0
for i in range(X_without_zero_len):
numer += (X_without_zero[i] ** k) * (log_X_without_zero[i])
denom = 0.0
for i in range(X_without_zero_len):
denom += (X_without_zero[i] ** k)
return 1.0 / (numer / denom - one_over_n_sum_of_log_X_without_zero) - k
# Plot graph from x = 0.1 to 10 to see roughly where it crosses
# y = 0. Values of x close to where the graph crosses y = 0 will be
# a good initial guess for `scipy.optimize.fsolve`.
xs, ys = [], []
x, incr, stop = 0.1, 0.1, 10
while x <= stop:
xs.append(x)
ys.append(_eqn_for_k(x))
x += incr
myplot.Plot(xs, ys, label="plot")
myplot.Save(
root="ex5-13-p1-weibull-k-parameter-plot",
formats=["pdf", "png",],
xlabel="x",
ylabel="y",
title="weibull k parameter plot",
)
# This is a rough estimate from looking at the k parameter plot
initial_guess = 2.0
weibull_model_shape = scipy.optimize.fsolve(_eqn_for_k,
[initial_guess]
)[0]
weibull_model_scale_on_full_data = (
sum((x ** weibull_model_shape for x in simulated_data)) / \
float(nr_cohorts)
) ** (1.0 / weibull_model_shape)
weibull_model_scale_on_non_zero_data = (
sum((x ** weibull_model_shape for x in X_without_zero)) / \
float(X_without_zero_len)
) ** (1.0 / weibull_model_shape)
print("shape = {}, scale on full data = {}, scale on non-zero data = {}".format(
weibull_model_shape, weibull_model_scale_on_full_data,
weibull_model_scale_on_non_zero_data
))
def _weibull_pdf(shape, scale, x):
return (shape / scale * (x / scale) ** (shape - 1) *
exp(-((x / scale) ** shape))
)
for scale, root_suffix in [
(weibull_model_scale_on_full_data, "scale-on-full-data",),
(weibull_model_scale_on_non_zero_data, "scale-on-non-zero-data",),
]:
weibull_val_to_density = {}
i, incr, max_val = 0, 0.01, max(simulated_data)
while i <= max_val:
weibull_val_to_density[i] = _weibull_pdf(
weibull_model_shape, scale, i
)
i += incr
weibull_model_cdf = Cdf.MakeCdfFromDict(weibull_val_to_density,
name="model"
)
myplot.Cdfs([empirical_cdf, weibull_model_cdf])
myplot.Save(
root="ex5-13-p1-cdf-with-weibull-model-{}".format(
root_suffix
),
formats=["pdf", "png",],
xlabel="cancer cases",
ylabel="CDF(x)",
title="CDF & Weibull model",
)
weibull_val_minus_one_to_density = dict(
map(lambda x: (x[0] - 1, x[1]),
weibull_val_to_density.items()
)
)
weibull_model_left_shifted_cdf = Cdf.MakeCdfFromDict(
weibull_val_minus_one_to_density, name="model"
)
myplot.Cdfs([empirical_cdf, weibull_model_left_shifted_cdf])
myplot.Save(
root="ex5-13-p1-cdf-with-weibull-model-left-shifted-by-one-{}".format(
root_suffix
),
formats=["pdf", "png",],
xlabel="cancer cases",
ylabel="CDF(x)",
title="CDF & Weibull model (left shifted by 1 unit)",
)
sum_of_negative_and_zero_density = sum(
map(lambda y: y[1],
filter(lambda x: x[0] <= 0,
weibull_val_minus_one_to_density.items()
)
)
)
weibull_val_to_density = dict(
filter(lambda x: x[0] > 0,
weibull_val_minus_one_to_density.items()
)
)
weibull_val_to_density[0] = sum_of_negative_and_zero_density
weibull_model_negative_grouped_with_zero_cdf = Cdf.MakeCdfFromDict(
weibull_val_to_density, name="model"
)
myplot.Cdfs(
[empirical_cdf, weibull_model_negative_grouped_with_zero_cdf]
)
myplot.Save(
root="ex5-13-p1-cdf-with-weibull-model-negative-densities-accum-with-zero-{}".format(root_suffix),
formats=["pdf", "png",],
xlabel="cancer cases",
ylabel="CDF(x)",
title="CDF & Weibull model (r.v's <= 0 are grouped under 0)",
)
# See if things generalize for the Weibull distribution
myplot.Cdfs([test_data_cdf, weibull_model_negative_grouped_with_zero_cdf])
myplot.Save(
root="ex5-13-p1-test-data-cdf-with-weibull-model-negative-densities-accum-with-zero-{}".format(root_suffix),
formats=["pdf", "png",],
xlabel="cancer cases",
ylabel="CDF(x)",
title="Test data CDF & Weibull model",
)
# see if things generalize for the Exponential distribution
myplot.Cdfs([test_data_cdf, shifted_exp_model_cdf])
myplot.Save(
root="ex5-13-p1-test-data-cdf-with-shifted-exp-model",
formats=["pdf", "png",],
xlabel="cancer cases",
ylabel="CDF(x)",
title="Test data CDF & Exp({}) model shifted by {} to the left".format(
exp_lambda, exp_model_shift
),
)
if __name__ == "__main__":
ex5_13()
"""This file contains code for use with "Think Stats",
by Allen B. Downey, available from greenteapress.com
Copyright 2010 Allen B. Downey
License: GNU GPLv3 http://www.gnu.org/licenses/gpl.html
"""
import math
import matplotlib
import matplotlib.pyplot as pyplot
import numpy as np
# customize some matplotlib attributes
#matplotlib.rc('figure', figsize=(4, 3))
#matplotlib.rc('font', size=14.0)
#matplotlib.rc('axes', labelsize=22.0, titlesize=22.0)
#matplotlib.rc('legend', fontsize=20.0)
#matplotlib.rc('xtick.major', size=6.0)
#matplotlib.rc('xtick.minor', size=3.0)
#matplotlib.rc('ytick.major', size=6.0)
#matplotlib.rc('ytick.minor', size=3.0)
class Brewer(object):
"""Encapsulates a nice sequence of colors.
Shades of blue that look good in color and can be distinguished
in grayscale (up to a point).
Borrowed from http://colorbrewer2.org/
"""
color_iter = None
colors = ['#081D58',
'#253494',
'#225EA8',
'#1D91C0',
'#41B6C4',
'#7FCDBB',
'#C7E9B4',
'#EDF8B1',
'#FFFFD9']
# lists that indicate which colors to use depending on how many are used
which_colors = [[],
[1],
[1, 3],
[0, 2, 4],
[0, 2, 4, 6],
[0, 2, 3, 5, 6],
[0, 2, 3, 4, 5, 6],
[0, 1, 2, 3, 4, 5, 6],
]
@classmethod
def Colors(cls):
"""Returns the list of colors.
"""
return cls.colors
@classmethod
def ColorGenerator(cls, n):
"""Returns an iterator of color strings.
n: how many colors will be used
"""
for i in cls.which_colors[n]:
yield cls.colors[i]
raise StopIteration('Ran out of colors in Brewer.ColorGenerator')
@classmethod
def InitializeIter(cls, num):
"""Initializes the color iterator with the given number of colors."""
cls.color_iter = cls.ColorGenerator(num)
@classmethod
def ClearIter(cls):
"""Sets the color iterator to None."""
cls.color_iter = None
@classmethod
def GetIter(cls):
"""Gets the color iterator."""
return cls.color_iter
def PrePlot(num=None, rows=1, cols=1):
"""Takes hints about what's coming.
num: number of lines that will be plotted
"""
if num:
Brewer.InitializeIter(num)
# TODO: get sharey and sharex working. probably means switching
# to subplots instead of subplot.
# also, get rid of the gray background.
if rows > 1 or cols > 1:
pyplot.subplots(rows, cols, sharey=True)
global SUBPLOT_ROWS, SUBPLOT_COLS
SUBPLOT_ROWS = rows
SUBPLOT_COLS = cols
def SubPlot(plot_number):
pyplot.subplot(SUBPLOT_ROWS, SUBPLOT_COLS, plot_number)
class InfiniteList(list):
"""A list that returns the same value for all indices."""
def __init__(self, val):
"""Initializes the list.
val: value to be stored
"""
list.__init__(self)
self.val = val
def __getitem__(self, index):
"""Gets the item with the given index.
index: int
returns: the stored value
"""
return self.val
def Underride(d, **options):
"""Add key-value pairs to d only if key is not in d.
If d is None, create a new dictionary.
d: dictionary
options: keyword args to add to d
"""
if d is None:
d = {}
for key, val in options.iteritems():
d.setdefault(key, val)
return d
def Clf():
"""Clears the figure and any hints that have been set."""
Brewer.ClearIter()
pyplot.clf()
def Figure(**options):
"""Sets options for the current figure."""
Underride(options, figsize=(6, 8))
pyplot.figure(**options)
def Plot(xs, ys, style='', **options):
"""Plots a line.
Args:
xs: sequence of x values
ys: sequence of y values
style: style string passed along to pyplot.plot
options: keyword args passed to pyplot.plot
"""
color_iter = Brewer.GetIter()
if color_iter:
try:
options = Underride(options, color=color_iter.next())
except StopIteration:
print 'Warning: Brewer ran out of colors.'
Brewer.ClearIter()
options = Underride(options, linewidth=3, alpha=0.8)
pyplot.plot(xs, ys, style, **options)
def Scatter(xs, ys, **options):
"""Makes a scatter plot.
xs: x values
ys: y values
options: options passed to pyplot.scatter
"""
options = Underride(options, color='blue', alpha=0.2,
s=30, edgecolors='none')
pyplot.scatter(xs, ys, **options)
def Pmf(pmf, **options):
"""Plots a Pmf or Hist as a line.
Args:
pmf: Hist or Pmf object
options: keyword args passed to pyplot.plot
"""
xs, ps = pmf.Render()
if pmf.name:
options = Underride(options, label=pmf.name)
Plot(xs, ps, **options)
def Pmfs(pmfs, **options):
"""Plots a sequence of PMFs.
Options are passed along for all PMFs. If you want different
options for each pmf, make multiple calls to Pmf.
Args:
pmfs: sequence of PMF objects
options: keyword args passed to pyplot.plot
"""
for pmf in pmfs:
Pmf(pmf, **options)
def Hist(hist, **options):
"""Plots a Pmf or Hist with a bar plot.
Args:
hist: Hist or Pmf object
options: keyword args passed to pyplot.bar
"""
# find the minimum distance between adjacent values
xs, fs = hist.Render()
width = min(Diff(xs))
if hist.name:
options = Underride(options, label=hist.name)
options = Underride(options,
align='center',
linewidth=0,
width=width)
pyplot.bar(xs, fs, **options)
def Hists(hists, **options):
"""Plots two histograms as interleaved bar plots.
Options are passed along for all PMFs. If you want different
options for each pmf, make multiple calls to Pmf.
Args:
hists: list of two Hist or Pmf objects
options: keyword args passed to pyplot.plot
"""
for hist in hists:
Hist(hist, **options)
def Diff(t):
"""Compute the differences between adjacent elements in a sequence.
Args:
t: sequence of number
Returns:
sequence of differences (length one less than t)
"""
diffs = [t[i+1] - t[i] for i in range(len(t)-1)]
return diffs
def Cdf(cdf, complement=False, transform=None, **options):
"""Plots a CDF as a line.
Args:
cdf: Cdf object
complement: boolean, whether to plot the complementary CDF
transform: string, one of 'exponential', 'pareto', 'weibull', 'gumbel'
options: keyword args passed to pyplot.plot
Returns:
dictionary with the scale options that should be passed to
myplot.Save or myplot.Show
"""
xs, ps = cdf.Render()
scale = dict(xscale='linear', yscale='linear')
for s in ['xscale', 'yscale']:
if s in options:
scale[s] = options.pop(s)
if transform == 'exponential':
complement = True
scale['yscale'] = 'log'
if transform == 'pareto':
complement = True
scale['yscale'] = 'log'
scale['xscale'] = 'log'
if complement:
ps = [1.0-p for p in ps]
if transform == 'weibull':
xs.pop()
ps.pop()
ps = [-math.log(1.0-p) for p in ps]
scale['xscale'] = 'log'
scale['yscale'] = 'log'
if transform == 'gumbel':
xs.pop(0)
ps.pop(0)
ps = [-math.log(p) for p in ps]
scale['yscale'] = 'log'
if cdf.name:
options = Underride(options, label=cdf.name)
Plot(xs, ps, **options)
return scale
def Cdfs(cdfs, complement=False, transform=None, **options):
"""Plots a sequence of CDFs.
cdfs: sequence of CDF objects
complement: boolean, whether to plot the complementary CDF
transform: string, one of 'exponential', 'pareto', 'weibull', 'gumbel'
options: keyword args passed to pyplot.plot
"""
for cdf in cdfs:
Cdf(cdf, complement, transform, **options)
def Contour(obj, pcolor=False, contour=True, imshow=False, **options):
"""Makes a contour plot.
d: map from (x, y) to z, or object that provides GetDict
pcolor: boolean, whether to make a pseudocolor plot
contour: boolean, whether to make a contour plot
imshow: boolean, whether to use pyplot.imshow
options: keyword args passed to pyplot.pcolor and/or pyplot.contour
"""
try:
d = obj.GetDict()
except AttributeError:
d = obj
Underride(options, linewidth=3, cmap=matplotlib.cm.Blues)
xs, ys = zip(*d.iterkeys())
xs = sorted(set(xs))
ys = sorted(set(ys))
X, Y = np.meshgrid(xs, ys)
func = lambda x, y: d.get((x, y), 0)
func = np.vectorize(func)
Z = func(X, Y)
x_formatter = matplotlib.ticker.ScalarFormatter(useOffset=False)
axes = pyplot.gca()
axes.xaxis.set_major_formatter(x_formatter)
if pcolor:
pyplot.pcolormesh(X, Y, Z, **options)
if contour:
cs = pyplot.contour(X, Y, Z, **options)
pyplot.clabel(cs, inline=1, fontsize=10)
if imshow:
extent = xs[0], xs[-1], ys[0], ys[-1]
pyplot.imshow(Z, extent=extent, **options)
def Pcolor(xs, ys, zs, pcolor=True, contour=False, **options):
"""Makes a pseudocolor plot.
xs:
ys:
zs:
pcolor: boolean, whether to make a pseudocolor plot
contour: boolean, whether to make a contour plot
options: keyword args passed to pyplot.pcolor and/or pyplot.contour
"""
Underride(options, linewidth=3, cmap=matplotlib.cm.Blues)
X, Y = np.meshgrid(xs, ys)
Z = zs
x_formatter = matplotlib.ticker.ScalarFormatter(useOffset=False)
axes = pyplot.gca()
axes.xaxis.set_major_formatter(x_formatter)
if pcolor:
pyplot.pcolormesh(X, Y, Z, **options)
if contour:
cs = pyplot.contour(X, Y, Z, **options)
pyplot.clabel(cs, inline=1, fontsize=10)
def Config(**options):
"""Configures the plot.
Pulls options out of the option dictionary and passes them to
title, xlabel, ylabel, xscale, yscale, xticks, yticks, axis, legend,
and loc.
"""
title = options.get('title', '')
pyplot.title(title)
xlabel = options.get('xlabel', '')
pyplot.xlabel(xlabel)
ylabel = options.get('ylabel', '')
pyplot.ylabel(ylabel)
if 'xscale' in options:
pyplot.xscale(options['xscale'])
if 'xticks' in options:
pyplot.xticks(options['xticks'])
if 'yscale' in options:
pyplot.yscale(options['yscale'])
if 'yticks' in options:
pyplot.yticks(options['yticks'])
if 'axis' in options:
pyplot.axis(options['axis'])
loc = options.get('loc', 0)
legend = options.get('legend', True)
if legend:
pyplot.legend(loc=loc)
def Show(**options):
"""Shows the plot.
For options, see Config.
options: keyword args used to invoke various pyplot functions
"""
# TODO: figure out how to show more than one plot
Config(**options)
pyplot.show()
def Save(root=None, formats=None, **options):
"""Saves the plot in the given formats.
For options, see Config.
Args:
root: string filename root
formats: list of string formats
options: keyword args used to invoke various pyplot functions
"""
Config(**options)
if formats is None:
formats = ['pdf', 'eps']
if root:
for fmt in formats:
SaveFormat(root, fmt)
Clf()
def SaveFormat(root, fmt='eps'):
"""Writes the current figure to a file in the given format.
Args:
root: string filename root
fmt: string format
"""
filename = '%s.%s' % (root, fmt)
print 'Writing', filename
pyplot.savefig(filename, format=fmt, dpi=300)
# provide aliases for calling functons with lower-case names
preplot = PrePlot
subplot = SubPlot
clf = Clf
figure = Figure
plot = Plot
scatter = Scatter
pmf = Pmf
pmfs = Pmfs
hist = Hist
hists = Hists
diff = Diff
cdf = Cdf
cdfs = Cdfs
contour = Contour
pcolor = Pcolor
config = Config
show = Show
save = Save
def main():
color_iter = Brewer.ColorGenerator(7)
for color in color_iter:
print color
if __name__ == '__main__':
main()
"""This file contains code for use with "Think Stats",
by Allen B. Downey, available from greenteapress.com
Copyright 2010 Allen B. Downey
License: GNU GPLv3 http://www.gnu.org/licenses/gpl.html
"""
"""This file contains class definitions for:
Hist: represents a histogram (map from values to integer frequencies).
Pmf: represents a probability mass function (map from values to probs).
_DictWrapper: private parent class for Hist and Pmf.
"""
import logging
import math
import random
class _DictWrapper(object):
"""An object that contains a dictionary."""
def __init__(self, d=None, name=''):
# if d is provided, use it; otherwise make a new dict
if d == None:
d = {}
self.d = d
self.name = name
def GetDict(self):
"""Gets the dictionary."""
return self.d
def Values(self):
"""Gets an unsorted sequence of values.
Note: one source of confusion is that the keys in this
dictionaries are the values of the Hist/Pmf, and the
values are frequencies/probabilities.
"""
return self.d.keys()
def Items(self):
"""Gets an unsorted sequence of (value, freq/prob) pairs."""
return self.d.items()
def Render(self):
"""Generates a sequence of points suitable for plotting.
Returns:
tuple of (sorted value sequence, freq/prob sequence)
"""
return zip(*sorted(self.Items()))
def Print(self):
"""Prints the values and freqs/probs in ascending order."""
for val, prob in sorted(self.d.iteritems()):
print val, prob
def Set(self, x, y=0):
"""Sets the freq/prob associated with the value x.
Args:
x: number value
y: number freq or prob
"""
self.d[x] = y
def Incr(self, x, term=1):
"""Increments the freq/prob associated with the value x.
Args:
x: number value
term: how much to increment by
"""
self.d[x] = self.d.get(x, 0) + term
def Mult(self, x, factor):
"""Scales the freq/prob associated with the value x.
Args:
x: number value
factor: how much to multiply by
"""
self.d[x] = self.d.get(x, 0) * factor
def Remove(self, x):
"""Removes a value.
Throws an exception if the value is not there.
Args:
x: value to remove
"""
del self.d[x]
def Total(self):
"""Returns the total of the frequencies/probabilities in the map."""
total = sum(self.d.itervalues())
return total
def MaxLike(self):
"""Returns the largest frequency/probability in the map."""
return max(self.d.itervalues())
class Hist(_DictWrapper):
"""Represents a histogram, which is a map from values to frequencies.
Values can be any hashable type; frequencies are integer counters.
"""
def Copy(self, name=None):
"""Returns a copy of this Hist.
Args:
name: string name for the new Hist
"""
if name is None:
name = self.name
return Hist(dict(self.d), name)
def Freq(self, x):
"""Gets the frequency associated with the value x.
Args:
x: number value
Returns:
int frequency
"""
return self.d.get(x, 0)
def Freqs(self):
"""Gets an unsorted sequence of frequencies."""
return self.d.values()
def IsSubset(self, other):
"""Checks whether the values in this histogram are a subset of
the values in the given histogram."""
for val, freq in self.Items():
if freq > other.Freq(val):
return False
return True
def Subtract(self, other):
"""Subtracts the values in the given histogram from this histogram."""
for val, freq in other.Items():
self.Incr(val, -freq)
class Pmf(_DictWrapper):
"""Represents a probability mass function.
Values can be any hashable type; probabilities are floating-point.
Pmfs are not necessarily normalized.
"""
def Copy(self, name=None):
"""Returns a copy of this Pmf.
Args:
name: string name for the new Pmf
"""
if name is None:
name = self.name
return Pmf(dict(self.d), name)
def Prob(self, x, default=0):
"""Gets the probability associated with the value x.
Args:
x: number value
default: value to return if the key is not there
Returns:
float probability
"""
return self.d.get(x, default)
def Probs(self):
"""Gets an unsorted sequence of probabilities."""
return self.d.values()
def Normalize(self, fraction=1.0):
"""Normalizes this PMF so the sum of all probs is 1.
Args:
fraction: what the total should be after normalization
"""
total = self.Total()
if total == 0.0:
raise ValueError('total probability is zero.')
logging.warning('Normalize: total probability is zero.')
return
factor = float(fraction) / total
for x in self.d:
self.d[x] *= factor
def Random(self):
"""Chooses a random element from this PMF.
Returns:
float value from the Pmf
"""
if len(self.d) == 0:
raise ValueError('Pmf contains no values.')
target = random.random()
total = 0.0
for x, p in self.d.iteritems():
total += p
if total >= target:
return x
# we shouldn't get here
assert False
def Mean(self):
"""Computes the mean of a PMF.
Returns:
float mean
"""
mu = 0.0
for x, p in self.d.iteritems():
mu += p * x
return mu
def Var(self, mu=None):
"""Computes the variance of a PMF.
Args:
mu: the point around which the variance is computed;
if omitted, computes the mean
Returns:
float variance
"""
if mu is None:
mu = self.Mean()
var = 0.0
for x, p in self.d.iteritems():
var += p * (x - mu)**2
return var
def Log(self):
"""Log transforms the probabilities."""
m = self.MaxLike()
for x, p in self.d.iteritems():
self.Set(x, math.log(p/m))
def Exp(self):
"""Exponentiates the probabilities."""
m = self.MaxLike()
for x, p in self.d.iteritems():
self.Set(x, math.exp(p-m))
def MakeHistFromList(t, name=''):
"""Makes a histogram from an unsorted sequence of values.
Args:
t: sequence of numbers
name: string name for this histogram
Returns:
Hist object
"""
hist = Hist(name=name)
[hist.Incr(x) for x in t]
return hist
def MakeHistFromDict(d, name=''):
"""Makes a histogram from a map from values to frequencies.
Args:
d: dictionary that maps values to frequencies
name: string name for this histogram
Returns:
Hist object
"""
return Hist(d, name)
def MakePmfFromList(t, name=''):
"""Makes a PMF from an unsorted sequence of values.
Args:
t: sequence of numbers
name: string name for this PMF
Returns:
Pmf object
"""
hist = MakeHistFromList(t, name)
return MakePmfFromHist(hist)
def MakePmfFromDict(d, name=''):
"""Makes a PMF from a map from values to probabilities.
Args:
d: dictionary that maps values to probabilities
name: string name for this PMF
Returns:
Pmf object
"""
pmf = Pmf(d, name)
pmf.Normalize()
return pmf
def MakePmfFromHist(hist, name=None):
"""Makes a normalized PMF from a Hist object.
Args:
hist: Hist object
name: string name
Returns:
Pmf object
"""
if name is None:
name = hist.name
# make a copy of the dictionary
d = dict(hist.GetDict())
pmf = Pmf(d, name)
pmf.Normalize()
return pmf
def MakePmfFromCdf(cdf, name=None):
"""Makes a normalized Pmf from a Cdf object.
Args:
cdf: Cdf object
name: string name for the new Pmf
Returns:
Pmf object
"""
if name is None:
name = cdf.name
pmf = Pmf(name=name)
prev = 0.0
for val, prob in cdf.Items():
pmf.Incr(val, prob-prev)
prev = prob
return pmf
def MakeMixture(pmfs, name='mix'):
"""Make a mixture distribution.
Args:
pmfs: Pmf that maps from Pmfs to probs.
name: string name for the new Pmf.
Returns: Pmf object.
"""
mix = Pmf(name=name)
for pmf, prob in pmfs.Items():
for x, p in pmf.Items():
mix.Incr(x, p * prob)
return mix
numpy==1.9.2
matplotlib==1.4.3
scipy==0.16.0
"""This file contains code for use with "Think Stats",
by Allen B. Downey, available from greenteapress.com
Copyright 2010 Allen B. Downey
License: GNU GPLv3 http://www.gnu.org/licenses/gpl.html
"""
import bisect
import random
def Mean(t):
"""Computes the mean of a sequence of numbers.
Args:
t: sequence of numbers
Returns:
float
"""
return float(sum(t)) / len(t)
def MeanVar(t):
"""Computes the mean and variance of a sequence of numbers.
Args:
t: sequence of numbers
Returns:
tuple of two floats
"""
mu = Mean(t)
var = Var(t, mu)
return mu, var
def Trim(t, p=0.01):
"""Trims the largest and smallest elements of t.
Args:
t: sequence of numbers
p: fraction of values to trim off each end
Returns:
sequence of values
"""
n = int(p * len(t))
t = sorted(t)[n:-n]
return t
def Jitter(values, jitter=0.5):
"""Jitters the values by adding a uniform variate in (-jitter, jitter)."""
return [x + random.uniform(-jitter, jitter) for x in values]
def TrimmedMean(t, p=0.01):
"""Computes the trimmed mean of a sequence of numbers.
Side effect: sorts the list.
Args:
t: sequence of numbers
p: fraction of values to trim off each end
Returns:
float
"""
t = Trim(t, p)
return Mean(t)
def TrimmedMeanVar(t, p=0.01):
"""Computes the trimmed mean and variance of a sequence of numbers.
Side effect: sorts the list.
Args:
t: sequence of numbers
p: fraction of values to trim off each end
Returns:
float
"""
t = Trim(t, p)
mu, var = MeanVar(t)
return mu, var
def Var(t, mu=None):
"""Computes the variance of a sequence of numbers.
Args:
t: sequence of numbers
mu: value around which to compute the variance; by default,
computes the mean.
Returns:
float
"""
if mu is None:
mu = Mean(t)
# compute the squared deviations and return their mean.
dev2 = [(x - mu)**2 for x in t]
var = Mean(dev2)
return var
def Binom(n, k, d={}):
"""Compute the binomial coefficient "n choose k".
Args:
n: number of trials
k: number of successes
d: map from (n,k) tuples to cached results
Returns:
int
"""
if k == 0:
return 1
if n == 0:
return 0
try:
return d[n, k]
except KeyError:
res = Binom(n-1, k) + Binom(n-1, k-1)
d[n, k] = res
return res
class Interpolator(object):
"""Represents a mapping between sorted sequences; performs linear interp.
Attributes:
xs: sorted list
ys: sorted list
"""
def __init__(self, xs, ys):
self.xs = xs
self.ys = ys
def Lookup(self, x):
"""Looks up x and returns the corresponding value of y."""
return self._Bisect(x, self.xs, self.ys)
def Reverse(self, y):
"""Looks up y and returns the corresponding value of x."""
return self._Bisect(y, self.ys, self.xs)
def _Bisect(self, x, xs, ys):
"""Helper function."""
if x <= xs[0]:
return ys[0]
if x >= xs[-1]:
return ys[-1]
i = bisect.bisect(xs, x)
frac = 1.0 * (x - xs[i-1]) / (xs[i] - xs[i-1])
y = ys[i-1] + frac * 1.0 * (ys[i] - ys[i-1])
return y
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment