Skip to content

Instantly share code, notes, and snippets.

@danibrear
Created February 26, 2017 18:45
Show Gist options
  • Save danibrear/c1ec3a6301420bff5f2ade88d74c0b8a to your computer and use it in GitHub Desktop.
Save danibrear/c1ec3a6301420bff5f2ade88d74c0b8a to your computer and use it in GitHub Desktop.
from probability_solution import (
make_power_plant_net,
set_probability,
get_alarm_prob,
get_gauge_prob,
get_temperature_prob,
get_marginal_temp_prob,
get_game_network,
calculate_posterior,
Gibbs_sampler,
MH_sampler,
compare_sampling,
sampling_question,
)
import unittest
from numpy import random
"""
Contains various local tests for Assignment 3.
"""
#Part 1a
def network_setup_test(power_plant):
"""Test that the power plant network has the proper number of nodes and edges."""
nodes = power_plant.nodes
if(len(nodes)==5):
print('correct number of nodes')
total_links = sum([len(n.children) for n in nodes] + [len(n.parents) for n in nodes])
if(total_links == 10):
print('correct number of edges between nodes')
return True
else:
print('incorrect number of edges between nodes')
else:
print('incorrect number of nodes')
return False
#Part 1b
def probability_setup_test(power_plant):
"""Test that all nodes in the power plant network have proper probability distributions.
Note that all nodes have to be named predictably for tests to run correctly."""
print('checking probability distribution for Temperature node...')
# first test temperature distribution
T_node = power_plant.get_node_by_name('temperature')
if(T_node is not None):
T_dist = T_node.dist.table
if(len(T_dist) == 2):
print('correct temperature distribution size')
test_prob = T_dist[0]
if(int(test_prob*100) == 80):
print('correct temperature distribution')
else:
print('incorrect temperature distribution')
return False
else:
print('incorrect temperature distribution size')
return False
# then faulty gauge distribution
print('checking probability distribution for Faulty Gauge node...')
F_G_node = power_plant.get_node_by_name('faulty gauge')
if(F_G_node is not None):
F_G_dist = F_G_node.dist.table
rows, cols = F_G_dist.shape
if(rows == 2 and cols == 2):
print('correct faulty gauge distribution size')
test_prob1 = F_G_dist[0][1]
test_prob2 = F_G_dist[1][0]
if(int(test_prob1*100) == 5 and int(test_prob2*100) == 20):
print('correct faulty gauge distribution')
else:
print('incorrect faulty gauge distribution')
return False
else:
print('incorrect faulty gauge distribution size')
return False
# faulty alarm distribution
print('checking probability distribution for Faulty Alarm node...')
F_A_node = power_plant.get_node_by_name('faulty alarm')
if(F_A_node is not None):
F_A_dist = F_A_node.dist.table
if(len(F_A_dist) == 2):
print('correct faulty alarm distribution size')
test_prob = F_A_dist[0]
if(int(test_prob*100) == 85):
print('correct faulty alarm distribution')
else:
print('incorrect faulty alarm distribution')
return False
else:
print('incorrect faulty alarm distribution size')
return False
# gauge distribution
# can't test exact probabilities because
# order of probabilities is not guaranteed
print('checking only the probability distribution size for Gauge node...')
G_node = power_plant.get_node_by_name('gauge')
if(G_node is not None):
G_dist = G_node.dist.table
rows1, rows2, cols = G_dist.shape
if(rows1 == 2 and rows2 == 2 and cols == 2):
print('correct gauge distribution size')
else:
print('incorrect gauge distribution size')
return False
# alarm distribution
print('checking only the probability distribution size for Alarm node...')
A_node = power_plant.get_node_by_name('alarm')
if(A_node is not None):
A_dist = A_node.dist.table
rows1, rows2, cols = A_dist.shape
if(rows1 == 2 and rows2 == 2 and cols == 2):
print('correct alarm distribution size')
else:
print('incorrect alarm distribution size')
return False
return True
#Part 2a Test
def games_network_test(games_net):
"""Test that the games network has the proper number of nodes and edges."""
print('checking the total number of edges and nodes in the network...')
nodes = games_net.nodes
if(len(nodes)==6):
print('correct number of nodes')
total_links = sum([len(n.children) for n in nodes] + [len(n.parents) for n in nodes])
if(total_links == 12 ):
print('correct number of edges')
else:
print('incorrect number of edges')
return False
else:
print('incorrect number of nodes')
return False
# Now testing that all nodes in the games network have proper probability distributions.
# Note that all nodes have to be named predictably for tests to run correctly.
# First testing team distributions.
# You can check this for all teams i.e. A,B,C (by replacing the first line for 'B','C')
print ('checking probability distribution for Team A...')
A_node = games_net.get_node_by_name('A')
if(A_node is not None):
A_dist = A_node.dist.table
if(len(A_dist) == 4):
print('correct team distribution size')
test_prob = A_dist[0]
test_prob2 = A_dist[2]
if(int(test_prob*100) == 15 and int(test_prob2*100)==30):
print('correct team distribution')
else:
print('incorrect team distributions')
return False
else:
print('incorrect team distribution size')
return False
else:
print 'No node with the name A exists.'
return False
# Now testing match distributions.
# You can check this for all matches i.e. AvB,BvC,CvA (by replacing the first line)
print ('checking probability distribution for match AvB...')
AvB_node = games_net.get_node_by_name('AvB')
if(AvB_node is not None):
AvB_dist = AvB_node.dist.table
rows1, rows2, cols = AvB_dist.shape
if(rows1 == 4 and rows2 == 4 and cols == 3):
print('correct match distribution size')
flag1 = True
flag2 = True
flag3 = True
for i in range(0, 4):
for j in range(0,4):
x = AvB_dist[i,j,]
if i==j:
if x[0]!=x[1]:
flag1=False
if j>i:
if not(x[1]>x[0] and x[1]>x[2]):
flag2=False
if j<i:
if not (x[0]>x[1] and x[0]>x[2]):
flag3=False
if (flag1 and flag2 and flag3):
print('correct match distribution')
else:
if not flag1:
print('incorrect match distribution for equal skill levels')
return False
if ((not flag2) or (not flag3)):
print('incorrect match distribution: team with higher skill should have higher probability of winning')
return False
else:
print('incorrect match distribution size')
return False
else:
print 'No node with the name AvB exists.'
return False
return True
#Part 2b Test
def posterior_test(posterior):
if (abs(posterior[0]-0.25)<0.01 and abs(posterior[1]-0.42)<0.01 and abs(posterior[2]-0.31)<0.01):
print('correct posterior {}'.format(posterior))
else:
print('incorrect posterior calculated {}, diff({},{},{})'.format(posterior, abs(posterior[0]-0.25), abs(posterior[1]-0.42), abs(posterior[2]-0.31)))
return False
return True
class HW3Tests(unittest.TestCase):
def test_network_setup_test(self):
power_plant = make_power_plant_net()
self.assertTrue(network_setup_test(power_plant))
def test_probability_setup(self):
power_plant = make_power_plant_net()
set_probability(power_plant)
self.assertTrue(probability_setup_test(power_plant))
def test_get_alarm_prob(self):
power_plant = make_power_plant_net()
set_probability(power_plant)
alarm_prob = get_alarm_prob(power_plant, True)
self.assertAlmostEqual(alarm_prob, 0.25, 3)
def test_get_gauge_prob(self):
power_plant = make_power_plant_net()
set_probability(power_plant)
gauge_prob = get_gauge_prob(power_plant, True)
self.assertAlmostEqual(gauge_prob, 0.14, 3)
def test_get_temperature_prob(self):
power_plant = make_power_plant_net()
set_probability(power_plant)
temp_prob = get_temperature_prob(power_plant, True)
print('temp probability given the alarm is going off and nothing is faulty: {}'.format(temp_prob))
self.assertGreater(temp_prob, 0.2)
def test_get_marginal_temperature_prob(self):
power_plant = make_power_plant_net()
set_probability(power_plant)
temp_prob = get_marginal_temp_prob(power_plant, True)
self.assertAlmostEqual(temp_prob, 0.2, 4)
def test_games_network(self):
game_network = get_game_network()
self.assertTrue(games_network_test(game_network))
def test_posterior(self):
game_network = get_game_network()
posterior = calculate_posterior(game_network)
self.assertTrue(posterior_test(posterior))
def test_gibbs_sampler(self):
initial_state = [0, 0, 0, 0, 0, 0]
games_network = get_game_network()
random.seed(12345)
sample = Gibbs_sampler(games_network, initial_state)
diffs = [x != y for (x,y) in zip(sample, initial_state)]
self.assertEqual(sum(diffs), 1)
def test_gibbs_sampler_none(self):
games_network = get_game_network()
random.seed(12345)
sample = Gibbs_sampler(games_network, None)
def test_gibbs_sampler_blank(self):
initial_state = [0, 0, 0, 0, 0, 2]
games_network = get_game_network()
random.seed(12345)
sample = Gibbs_sampler(games_network, [])
def test_mh_sampler(self):
initial_state = [0, 0, 0, 0, 0, 2]
games_network = get_game_network()
random.seed(12345)
sample = MH_sampler(games_network, initial_state)
diffs = [x != y for (x, y) in zip(sample, initial_state)]
self.assertGreater(sum(diffs), 1)
def test_mh_sampler_none(self):
games_network = get_game_network()
random.seed(12345)
sample = MH_sampler(games_network, None)
def test_mh_sampler_none(self):
games_network = get_game_network()
random.seed(12345)
sample = MH_sampler(games_network, [])
def test_sampling_question(self):
print(sampling_question())
def test_compare_sampling(self):
initial_state = [0, 0, 0, 0, 0, 2]
delta = 0.001
games_network = get_game_network()
G_conv, MH_conv, G_count, MH_count, MH_rej_count = compare_sampling(games_network, initial_state, delta)
print('''
Gibbs Convergence: {}
MH Convergence: {}
Gibbs Count: {}
MH Count: {}
MH Rejection Count:{}'''.format(G_conv,
MH_conv,
G_count,
MH_count,
MH_rej_count))
print('MH posterior')
print(posterior_test(MH_conv))
print('Gibbs posterior')
print(posterior_test(G_conv))
def test_compare_sampling_None(self):
initial_state = None
delta = 0.001
games_network = get_game_network()
G_conv, MH_conv, G_count, MH_count, MH_rej_count = compare_sampling(games_network, initial_state, delta)
print('''
Gibbs Convergence: {}
MH Convergence: {}
Gibbs Count: {}
MH Count: {}
MH Rejection Count:{}'''.format(G_conv,
MH_conv,
G_count,
MH_count,
MH_rej_count))
print('MH posterior')
print(posterior_test(MH_conv))
print('Gibbs posterior')
print(posterior_test(G_conv))
def test_compare_sampling_blank_state(self):
initial_state = []
delta = 0.001
games_network = get_game_network()
G_conv, MH_conv, G_count, MH_count, MH_rej_count = compare_sampling(games_network, initial_state, delta)
print('''
Gibbs Convergence: {}
MH Convergence: {}
Gibbs Count: {}
MH Count: {}
MH Rejection Count:{}'''.format(G_conv,
MH_conv,
G_count,
MH_count,
MH_rej_count))
print('MH posterior')
print(posterior_test(MH_conv))
print('Gibbs posterior')
print(posterior_test(G_conv))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment