Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save cs224/9a19b4ba2c7511e317be90c32a4d40d7 to your computer and use it in GitHub Desktop.
Save cs224/9a19b4ba2c7511e317be90c32a4d40d7 to your computer and use it in GitHub Desktop.
Simple Bayesian Network via Monte Carlo Markov Chain in PyMC3
import math
import pyjags
import numpy as np
import pandas as pd
np.random.seed(0)
np.set_printoptions(precision=3)
def pyjags_trace():
code = '''
model {
rain ~ dbern(0.2)
sprinkler_p <- ifelse(rain, 0.01, 0.40)
sprinkler ~ dbern(sprinkler_p)
grass_wet_p <- ifelse(rain, ifelse(sprinkler, 0.99, 0.80), ifelse(sprinkler, 0.90, 0.0))
grass_wet ~ dbern(grass_wet_p)
}
'''
model = pyjags.Model(code, data=dict(), chains=4)
samples = model.sample(5000, vars=['rain', 'sprinkler_p', 'sprinkler', 'grass_wet_p', 'grass_wet'])
trace = pd.Panel({k: v.squeeze(0) for k, v in samples.items()})
trace.axes[0].name = 'Variable'
trace.axes[1].name = 'Iteration'
trace.axes[2].name = 'Chain'
return trace
def pyjags_trace_with_evidence():
code = '''
model {
rain ~ dbern(0.2)
sprinkler_p <- ifelse(rain, 0.01, 0.40)
sprinkler ~ dbern(sprinkler_p)
grass_wet_p <- ifelse(rain, ifelse(sprinkler, 0.99, 0.80), ifelse(sprinkler, 0.90, 0.0))
grass_wet ~ dbern(grass_wet_p)
}
'''
grass_wet = 1.0
model = pyjags.Model(code, data=dict(grass_wet=grass_wet), init=dict(rain=1, sprinkler=1), chains=4)
samples = model.sample(5000, vars=['rain', 'sprinkler_p', 'sprinkler', 'grass_wet_p', 'grass_wet'])
trace = pd.Panel({k: v.squeeze(0) for k, v in samples.items()})
trace.axes[0].name = 'Variable'
trace.axes[1].name = 'Iteration'
trace.axes[2].name = 'Chain'
return trace
trace1 = pyjags_trace()
trace2 = pyjags_trace_with_evidence()
def trace_values(ldf):
p_rain_wet = float(ldf[(ldf['rain'] == 1) & (ldf['grass_wet'] == 1)].shape[0]) / ldf[ldf['grass_wet'] == 1].shape[0]
p_sprinkler_wet = float(ldf[(ldf['sprinkler'] == 1) & (ldf['grass_wet'] == 1)].shape[0]) / ldf[(ldf['grass_wet'] == 1)].shape[0]
d = float(ldf[(ldf['sprinkler'] == 0) & (ldf['rain'] == 0)].shape[0])
if math.isclose(d, 0.):
p_not_sprinkler_rain_wet = None
else:
p_not_sprinkler_rain_wet = float(ldf[(ldf['sprinkler'] == 0) & (ldf['rain'] == 0) & (ldf['grass_wet'] == 1)].shape[0]) / d
return (p_rain_wet, p_sprinkler_wet, p_not_sprinkler_rain_wet)
print(trace_values(trace1.to_frame()))
print(trace_values(trace2.to_frame()))
import math
import pymc
import numpy as np
import pandas as pd
import lea
np.random.seed(124)
rain = lea.B(20,100)
sprinkler = lea.Lea.if_(rain, lea.B(1,100), lea.B(40,100))
grassWet = lea.Lea.buildCPT(( ~sprinkler & ~rain, False),
( ~sprinkler & rain, lea.B(80,100)),
( sprinkler & ~rain, lea.B(90,100)),
( sprinkler & rain, lea.B(99,100)))
# lea.P(grassWet & sprinkler & rain) # 99/50000
# lea.Pf(grassWet & sprinkler & rain) # 0.00198
# lea.P(rain.given(grassWet)) # 891/2491
# lea.Pf(rain.given(grassWet)) # 0.3576876756322762
# lea.Pf(rain.given(~grassWet)) # 0.07182480693230847
# lea.Pf(grassWet.given(sprinkler&~rain)) # 0.9
# lea.Pf(grassWet.given(~sprinkler&~rain)) # 0.0
niter = 10000
def pymc_trace():
# Initialization
observed_values = [1.]
rain = pymc.Bernoulli('rain', .2, value=np.ones(len(observed_values)))
sprinkler_p = pymc.Lambda('sprinkler_p', lambda rain=rain: np.where(rain, .01, .4))
# "Real" sprinkler varible
sprinkler = pymc.Bernoulli('sprinkler', sprinkler_p, value=np.ones(len(observed_values)))
grass_wet_p = pymc.Lambda('grass_wet_p', lambda sprinkler=sprinkler, rain=rain: np.where(sprinkler, np.where(rain, .99, .9), np.where(rain, .8, 0.0)))
# grass_wet = pymc.Bernoulli('grass_wet', p_grass_wet, value=observed_values, observed=True)
grass_wet = pymc.Bernoulli('grass_wet', grass_wet_p, value=np.ones(len(observed_values))) # , value=np.ones(len(observed_values))
model = pymc.Model([grass_wet, grass_wet_p, sprinkler, sprinkler_p, rain])
mcmc = pymc.MCMC(model)
mcmc.sample(niter, 2000)
# pymc.Matplot.plot(mcmc)
#
# geweke = pymc.geweke(mcmc)
# pymc.Matplot.geweke_plot(geweke)
dictionary = {
'Rain': [1 if ii[0] else 0 for ii in mcmc.trace('rain')[:].tolist() ],
'Sprinkler': [1 if ii[0] else 0 for ii in mcmc.trace('sprinkler')[:].tolist() ],
'Grass Wet': [1 if ii[0] else 0 for ii in mcmc.trace('grass_wet')[:].tolist() ],
'Sprinkler Probability': [ii[0] for ii in mcmc.trace('sprinkler_p')[:].tolist()],
'Grass Wet Probability': [ii[0] for ii in mcmc.trace('grass_wet_p')[:].tolist()],
}
ldf = pd.DataFrame(dictionary)
return ldf
df1 = pymc_trace()
print(df1.head())
dfX = pymc_trace() # necessary to get "good" values for pymc_trace_with_evidence()
def pymc_trace_with_evidence():
# Initialization
observed_values = [1.]
rain = pymc.Bernoulli('rain', .2, value=np.ones(len(observed_values))) # , value=np.ones(len(observed_values))
sprinkler_p = pymc.Lambda('sprinkler_p', lambda rain=rain: np.where(rain, .01, .4))
# "Real" sprinkler varible
sprinkler = pymc.Bernoulli('sprinkler', sprinkler_p, value=np.ones(len(observed_values))) # , value=np.ones(len(observed_values))
grass_wet_p = pymc.Lambda('grass_wet_p', lambda sprinkler=sprinkler, rain=rain: np.where(sprinkler, np.where(rain, .99, .9), np.where(rain, .8, 0.0)))
# grass_wet = pymc.Bernoulli('grass_wet', p_grass_wet, value=observed_values, observed=True)
grass_wet = pymc.Bernoulli('grass_wet', grass_wet_p, value = observed_values, observed = True) # , value=np.ones(len(observed_values))
model = pymc.Model([grass_wet, grass_wet_p, sprinkler, sprinkler_p, rain])
mcmc = pymc.MCMC(model)
mcmc.sample(niter, 2000)
# pymc.Matplot.plot(mcmc)
#
# geweke = pymc.geweke(mcmc)
# pymc.Matplot.geweke_plot(geweke)
dictionary = {
'Rain': [1 if ii[0] else 0 for ii in mcmc.trace('rain')[:].tolist() ],
'Sprinkler': [1 if ii[0] else 0 for ii in mcmc.trace('sprinkler')[:].tolist() ],
'Grass Wet': [1 for ii in mcmc.trace('grass_wet_p')[:].tolist() ], # always true
'Sprinkler Probability': [ii[0] for ii in mcmc.trace('sprinkler_p')[:].tolist()],
'Grass Wet Probability': [ii[0] for ii in mcmc.trace('grass_wet_p')[:].tolist()],
}
ldf = pd.DataFrame(dictionary)
return ldf
df2 = pymc_trace_with_evidence()
print(df2.head())
def pymc_trace_values(ldf):
# p_rain = ldf[(ldf['Rain'] == 1)].shape[0] / ldf.shape[0]
# print(p_rain)
# p_grass_wet = ldf[(ldf['Grass Wet'] == 1)].shape[0] / ldf.shape[0]
# print(p_grass_wet)
p_rain_wet = float(ldf[(ldf['Rain'] == 1) & (ldf['Grass Wet'] == 1)].shape[0]) / ldf[ldf['Grass Wet'] == 1].shape[0]
p_sprinkler_wet = float(ldf[(ldf['Sprinkler'] == 1) & (ldf['Grass Wet'] == 1)].shape[0]) / ldf[(ldf['Grass Wet'] == 1)].shape[0]
d = float(ldf[(ldf['Sprinkler'] == 0) & (ldf['Rain'] == 0)].shape[0])
if math.isclose(d, 0.):
p_not_sprinkler_rain_wet = None
else:
p_not_sprinkler_rain_wet = float(ldf[(ldf['Sprinkler'] == 0) & (ldf['Rain'] == 0) & (ldf['Grass Wet'] > 0.5)].shape[0]) / d
return (p_rain_wet, p_sprinkler_wet, p_not_sprinkler_rain_wet)
# Given grass is wet, what is the probability that it was rained?
a = lea.Pf(rain.given(grassWet))
# Given grass is wet, what is the probability that sprinkler was opened?
b = lea.Pf(sprinkler.given(grassWet))
# Given that it did not rain and that the sprinkler is off what is the probability that the grass is wet?
c = lea.Pf(grassWet.given(~rain, ~sprinkler))
print((a,b,c))
print(pymc_trace_values(df1))
print(pymc_trace_values(df2))
# Original example in PyMC2:
# https://bugra.github.io/work/notes/2014-05-23/simple-bayesian-network-via-monte-carlo-markov-chain-mcmc-pymc/
import pandas as pd
import pymc3 as pm
model = pm.Model()
with model:
rain = pm.Bernoulli('rain', 0.2)
sprinkler_p = pm.Deterministic('sprinkler_p', pm.math.switch(rain, 0.01, 0.40))
sprinkler = pm.Bernoulli('sprinkler', sprinkler_p)
grass_wet_p = pm.Deterministic('grass_wet_p', pm.math.switch(rain, pm.math.switch(sprinkler, 0.99, 0.80), pm.math.switch(sprinkler, 0.90, 0.0)))
grass_wet = pm.Bernoulli('grass_wet', grass_wet_p)
# start = pm.find_MAP() # Use MAP estimate (optimization) as the initial state for MCMC
step = pm.Metropolis()
trace = pm.sample(100000, step=step, tune=5000, random_seed=123, progressbar=True) # init=start,
pm.traceplot(trace)
dictionary = {
'Rain': [1 if ii else 0 for ii in trace['rain'].tolist() ],
'Sprinkler': [1 if ii else 0 for ii in trace['sprinkler'].tolist() ],
'Sprinkler Probability': [ii for ii in trace['sprinkler_p'].tolist()],
'Grass Wet Probability': [ii for ii in trace['grass_wet_p'].tolist()],
}
df = pd.DataFrame(dictionary)
df.head()
# Given grass is wet, what is the probability that it was rained?
p_rain_wet = float(df[(df['Rain'] == 1) & (df['Grass Wet Probability'] > 0.5)].shape[0]) / df[df['Grass Wet Probability'] > 0.5].shape[0]
print(p_rain_wet)
# Given grass is wet, what is the probability that sprinkler was opened?
p_sprinkler_wet = float(df[(df['Sprinkler'] == 1) & (df['Grass Wet Probability'] > 0.5)].shape[0]) / df[df['Grass Wet Probability'] > 0.5].shape[0]
print(p_sprinkler_wet)
# Given sprinkler is off and it does not rain, what is the probability that grass is wet?
p_not_sprinkler_rain_wet = float(df[(df['Sprinkler'] == 0) & (df['Rain'] == 0) & (df['Grass Wet Probability'] > 0.5)].shape[0]) / df[df['Grass Wet Probability'] > 0.5].shape[0]
print(p_not_sprinkler_rain_wet)
import numpy as np
import pandas as pd
import pymc3 as pm
niter = 10000 # 10000
tune = 5000 # 5000
model = pm.Model()
with model:
tv = [1]
rain = pm.Bernoulli('rain', 0.2, shape=1, testval=tv)
sprinkler_p = pm.Deterministic('sprinkler_p', pm.math.switch(rain, 0.01, 0.40))
sprinkler = pm.Bernoulli('sprinkler', sprinkler_p, shape=1, testval=tv)
grass_wet_p = pm.Deterministic('grass_wet_p', pm.math.switch(rain, pm.math.switch(sprinkler, 0.99, 0.80), pm.math.switch(sprinkler, 0.90, 0.0)))
grass_wet = pm.Bernoulli('grass_wet', grass_wet_p, observed=np.array([1]), shape=1)
trace = pm.sample(20000, step=[pm.BinaryGibbsMetropolis([rain, sprinkler])], tune=tune, random_seed=124)
# pm.traceplot(trace)
dictionary = {
'Rain': [1 if ii[0] else 0 for ii in trace['rain'].tolist() ],
'Sprinkler': [1 if ii[0] else 0 for ii in trace['sprinkler'].tolist() ],
'Sprinkler Probability': [ii[0] for ii in trace['sprinkler_p'].tolist()],
'Grass Wet Probability': [ii[0] for ii in trace['grass_wet_p'].tolist()],
}
df = pd.DataFrame(dictionary)
p_rain = df[(df['Rain'] == 1)].shape[0] / df.shape[0]
print(p_rain)
p_sprinkler = df[(df['Sprinkler'] == 1)].shape[0] / df.shape[0]
print(p_sprinkler)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment