|
""" |
|
Code to run the Gibbs sampler |
|
|
|
@author : Spencer Lyon <spencer.lyon@stern.nyu.edu> |
|
@date : 2014-11-05 13:29:48 |
|
|
|
## Notes |
|
|
|
* When defining the prior for σ_m I needed the mode of the InverseGamma. |
|
This is equal to β / (α + 1). |
|
""" |
|
import Base: mean |
|
|
|
using Distributions |
|
using StateSpace |
|
using MCMC |
|
using HDF5 |
|
|
|
mean(d::NormalInverseGamma) = (d.mu, mean(InverseGamma(d.shape, d.scale))) |
|
|
|
## ----------- ## |
|
#- Handle Data -# |
|
## ----------- ## |
|
|
|
# pull in data from pandas |
|
# using Pandas |
|
# cs = Pandas.read_msgpack("cs_data.msg") |
|
# cs_raw = copy(values(cs)) # copy converts to Julia array |
|
cs_raw = readcsv("cs_raw.csv") |
|
|
|
# split into training sample and estimation data |
|
train = cs_raw[1:52, 1] |
|
cs_data = cs_raw[53:end, :] |
|
|
|
T = size(cs_data, 1) |
|
|
|
## ------------- ## |
|
#- Define Priors -# |
|
## ------------- ## |
|
|
|
# dict for priors |
|
priors = Dict{Symbol, Distribution}() |
|
priors[:μ0] = Normal(mean(train), 0.15) # good |
|
priors[:π0] = Normal(mean(train), 0.025) # good |
|
priors[:lnr0] = Normal(log(var(train)), 5.0) # good |
|
priors[:lnq0] = Normal(log(var(train)/25), 5.0) # good |
|
priors[:W] = InverseWishart(10.0, 0.05 * eye(2)) # TODO: check this |
|
# priors[:ρ_m] = Normal(0.0, 0.45) # good |
|
# priors[:σ_m] = InverseGamma(2*(0.2)/std(train) - 1, 0.2) # TODO: check this |
|
priors[:ρm_σm] = NormalInverseGamma(0.0, 0.45, 2*(0.2)/std(train) - 1, 0.2) |
|
μ, Eσ_m = mean(priors[:ρm_σm]) |
|
priors[:m0] = Normal(0.0, 10*Eσ_m^2 / (1-μ^2)) |
|
|
|
## -------------- ## |
|
#- Initial values -# |
|
## -------------- ## |
|
|
|
function get_init(priors, T) |
|
init = Dict{Symbol, Any}() |
|
init[:π] = rand(priors[:π0], T) |
|
init[:μ] = rand(priors[:μ0], T) |
|
init[:m] = rand(priors[:m0], T) |
|
init[:r] = exp(rand(priors[:lnr0], T)) |
|
init[:q] = exp(rand(priors[:lnq0], T)) |
|
init[:W] = rand(priors[:W]) |
|
init[:ρ_m], init[:σ_m] = rand(priors[:ρm_σm]) |
|
|
|
return init |
|
end |
|
|
|
## --------- ## |
|
#- Data Dict -# |
|
## --------- ## |
|
|
|
# construct data Dict for the model |
|
data = Dict() |
|
data["π0"] = mean(priors[:π0]) |
|
data["μ0"] = mean(priors[:μ0]) |
|
data["m0"] = mean(priors[:m0]) |
|
|
|
data["T"] = T # defined above when importing data |
|
|
|
# Construct ranges of noisy, both, clean dates |
|
noisy_range = 1:(findfirst(!isnan(cs_data[:, 2])) - 1) |
|
both_range = (noisy_range.stop + 1):(findfirst(isnan(cs_data[:, 1])) -1) |
|
clean_range = (both_range.stop + 1):T |
|
|
|
## check to make sure ranges are accurate |
|
# "clean" column is nan in all of noisy_range |
|
@assert Base.all(isnan(cs_data[noisy_range, 2])) |
|
|
|
# neither column is nan in both_range |
|
@assert !Base.any(isnan(cs_data[both_range, 1:2][:])) |
|
|
|
# "noisy" column is nan in clean_range |
|
@assert Base.all(isnan(cs_data[clean_range, 1])) |
|
|
|
# Make the TimeVaryingParam for the data |
|
# tt is just the type of each of element in our comprehensions. |
|
# I didn't want to type it three times. |
|
tt = (Vector{Float64}, UnitRange{Int}) |
|
|
|
y = vcat(tt[([cs_data[t, 1]], t:t) for t in noisy_range], # noisy |
|
tt[(cs_data[t, 1:2][:], t:t) for t in both_range], # both |
|
tt[([cs_data[t, 2]], t:t) for t in clean_range]) # clean |
|
|
|
y = TimeVaryingParam(y...) |
|
data["y"] = y |
|
|
|
## ----------------------------- ## |
|
#- Measurement Equation Matrices -# |
|
## ----------------------------- ## |
|
# Now the measurement equation. This would be constant but for the |
|
# changing dimensionality. Only need three C, D matrices |
|
C_noisy = Float64[1 0 1] |
|
C_both = Float64[1 0 1; 1 0 0] |
|
C_clean = Float64[1 0 0] |
|
Ct = TimeVaryingParam((C_noisy, noisy_range), |
|
(C_both, both_range), |
|
(C_clean, clean_range)) |
|
|
|
# measurement error is just machine epsilon |
|
D_noisy = Diagonal(fill(1e-5, 1)) |
|
D_both = Diagonal(fill(1e-5, 2)) |
|
D_clean = Diagonal(fill(1e-5, 1)) |
|
Dt = TimeVaryingParam((D_noisy, noisy_range), |
|
(D_both, both_range), |
|
(D_clean, clean_range)) |
|
|
|
data["Ct"] = Ct |
|
data["Dt"] = Dt |
|
|
|
## ------------- ## |
|
#- Define blocks -# |
|
## ------------- ## |
|
|
|
function rq_metropolis_step!(cur_p, current, proposed, t, ζ) |
|
# Covariance matrix of current and proposal |
|
H_proposed = Diagonal(proposed) |
|
H_current = Diagonal(current) |
|
ζ_t = ζ[:, t] |
|
|
|
# acceptance probability |
|
αt = sqrt(det(H_current)) / sqrt(det(H_proposed)) |
|
αt *= exp(-0.5*ζ_t'*inv(H_proposed)*ζ_t)[1] |
|
αt /= exp(-0.5*ζ_t'*inv(H_current)*ζ_t)[1] |
|
|
|
# update v |
|
if αt > rand() |
|
cur_p[:r][t] = proposed[1] |
|
cur_p[:q][t] = proposed[2] |
|
else |
|
cur_p[:r][t] = current[1] |
|
cur_p[:q][t] = current[2] |
|
end |
|
|
|
nothing |
|
end |
|
|
|
function r_q_block!(m::MCMCGibbsModel) |
|
T = m.data["T"] |
|
|
|
cur_p = m.curr_params # we will use this a lot. Pull out to simplify notation |
|
|
|
# fill in measurement (first row) and state innovations (second row) |
|
ζ = Array(Float64, 2, T) |
|
ζ[1, :] = cur_p[:π] - cur_p[:μ] |
|
ζ[2, 2:end] = cur_p[:μ][2:end] - cur_p[:μ][1:end-1] |
|
ζ[2, 1] = cur_p[:μ][1] - m.data["μ0"] |
|
|
|
W = m.curr_params[:W] # get latest W |
|
Ω = W .* 0.5 # proposal variance. This is constant across time. |
|
|
|
# allocate space for time-t mean vector |
|
μt = Array(Float64, 2) |
|
|
|
# run single move sampler on interior points |
|
for t=2:T-1 |
|
# Pull out data. Construct the mean |
|
μt[1] = (log(cur_p[:r][t-1]) + log(cur_p[:r][t+1])) * 0.5 |
|
μt[2] = (log(cur_p[:q][t-1]) + log(cur_p[:q][t+1])) * 0.5 |
|
v_old = [cur_p[:r][t], cur_p[:q][t]] # current |
|
|
|
# generate proposal |
|
lnv_star = rand(MvNormal(μt, Ω)) # proposal |
|
v_star = exp(lnv_star) |
|
|
|
# do metropolis step (updates cur_p[:r][t] and cur_p[:q][t]) |
|
rq_metropolis_step!(cur_p, v_old, v_star, t, ζ) |
|
end |
|
|
|
# TODO: figure out how to deal with end points. Right now I just use |
|
# a single observation. t is period of cur_p[:r], cur_p[:q] we are filling |
|
# in and cond_t is the date we are conditioning on. |
|
# TODO: both 2, T-1 have been updated this scan already. Should I worry? |
|
for (t, cond_t) = [(1, 2), (T, T-1)] |
|
μt[1], μt[2] = log(cur_p[:r][cond_t]), log(cur_p[:q][cond_t]) |
|
v_old = [cur_p[:r][t], cur_p[:q][t]] # current |
|
lnv_star = rand(MvNormal(μt, Ω)) # proposal |
|
v_star = exp(lnv_star) |
|
rq_metropolis_step!(cur_p, v_old, v_star, t, ζ) |
|
end |
|
|
|
nothing |
|
end |
|
|
|
function W_block!(m::MCMCGibbsModel) |
|
# standard InverseWishart-Normal conjugate pair stuff. |
|
r, q = m.curr_params[:r], m.curr_params[:q] |
|
η_r = log(r[2:end]) - log(r[1:end-1]) |
|
η_q = log(q[2:end]) - log(q[1:end-1]) |
|
|
|
# prior is mean zero, so just need η'η in update rule |
|
η = [η_r η_q] |
|
|
|
old_dist = m.dists[:W] |
|
|
|
new_dist = InverseWishart(m.data["T"] + old_dist.df, |
|
full(old_dist.Ψ) + η'η) |
|
m.curr_params[:W] = rand(new_dist) |
|
nothing |
|
end |
|
|
|
function pi_mu_m_block!(m::MCMCGibbsModel) |
|
# forward filter backward sampler |
|
|
|
# Pull out current parameter values we need to condition on |
|
ρ_m = m.curr_params[:ρ_m] |
|
σ_m = m.curr_params[:σ_m] |
|
r = m.curr_params[:r] |
|
q = m.curr_params[:q] |
|
|
|
# pull out other data |
|
T = m.data["T"] |
|
y = m.data["y"] |
|
|
|
# Define state space matrices |
|
A = Float64[0 1 0 |
|
0 1 0 |
|
0 0 ρ_m] |
|
|
|
# Create space for time-dependent state space matrices |
|
Bt = Array((Array{Float64, 2}, UnitRange{Int}), T) |
|
for t = 1:T |
|
# Fill in period t state covariance matrix |
|
this_B_t = Float64[sqrt(r[t]) sqrt(q[t]) 0 |
|
0 sqrt(q[t]) 0 |
|
0 0 σ_m] |
|
Bt[t] = (this_B_t, t:t) |
|
end |
|
Bt = TimeVaryingParam(Bt...) |
|
|
|
Ct, Dt = m.data["Ct"], m.data["Dt"] |
|
|
|
# Finally construct the state space model |
|
lgss = LinearGaussianSSabcd(A, Bt, Ct, Dt) |
|
|
|
# initial mean and state covariance |
|
μ0 = Float64[mean(priors[i]) for i in [:π0, :μ0, :m0]] |
|
# μ0 = Float64[mean(m.priors[i]) for i in [:π0, :μ0, :m0]] |
|
Σ0 = eye(3) |
|
x0 = MvNormal(μ0, Σ0) |
|
|
|
# Now run the fwfilter_bwsampler |
|
new_state = fwfilter_bwsampler(lgss, y, x0) |
|
|
|
m.curr_params[:π] = squeeze(new_state[1, :], 1) |
|
m.curr_params[:μ] = squeeze(new_state[2, :], 1) |
|
m.curr_params[:m] = squeeze(new_state[3, :], 1) |
|
|
|
nothing |
|
end |
|
|
|
function rhom_sigmam_block!(m::MCMCGibbsModel) |
|
# NOTE: In multi-dim version V is cov matrix. So I think in scalar version |
|
# v should be variance |
|
|
|
# pull out original params |
|
pri = m.dists[:ρm_σm] |
|
μ = pri.mu |
|
v = pri.v0 |
|
α = pri.shape |
|
β = pri.scale |
|
|
|
# pull out data |
|
m_t = m.curr_params[:m][2:end] |
|
Lm = m.curr_params[:m][1:end-1] # lagged m (m_{t-1}) |
|
|
|
T = m.data["T"] - 1 |
|
|
|
# Update suffstats |
|
vp = inv(v + dot(Lm, Lm)) |
|
μp = vp * (v*μ + dot(Lm, m_t)) |
|
αp = α + T/2 |
|
βp = β + 0.5(dot(m_t, m_t) + μ'inv(v)*μ - μp'inv(vp)*μp) |
|
|
|
# sample |
|
new_dist = NormalInverseGamma(μp, vp, αp, βp) |
|
m.curr_params[:ρ_m], m.curr_params[:σ_m] = rand(new_dist) |
|
|
|
nothing |
|
end |
|
|
|
## --------------------------------------------------- ## |
|
#- Construct Dict mapping param names to block numbers -# |
|
## --------------------------------------------------- ## |
|
|
|
block_sym2num = Dict{Union(Symbol, Vector{Symbol}), Int}() |
|
block_sym2num[[:r, :q]] = 1 |
|
block_sym2num[:W] = 2 |
|
block_sym2num[[:π, :μ, :m]] = 3 |
|
block_sym2num[[:ρ_m, :σ_m]] = 4 |
|
|
|
## ------------------------------------------------- ## |
|
#- Construct Dict mapping block numbers to functions -# |
|
## ------------------------------------------------- ## |
|
|
|
block_funcs = Dict{Int, Function}() |
|
block_funcs[1] = r_q_block! |
|
block_funcs[2] = W_block! |
|
block_funcs[3] = pi_mu_m_block! |
|
block_funcs[4] = rhom_sigmam_block! |
|
|
|
## ---------------------------- ## |
|
#- Construct the MCMCGibbsModel -# |
|
## ---------------------------- ## |
|
|
|
# m = MCMCGibbsModel(get_init(priors, T), priors, block_sym2num, block_funcs, data, 4) |
|
|
|
## --------------- ## |
|
#- Run the sampler -# |
|
## --------------- ## |
|
|
|
|
|
#= |
|
_allocate is a helper function that the sampler code calls when |
|
allocating memory for samples of various types. |
|
|
|
This method has been implemented for scalars, vectors, and matrices |
|
|
|
The trailing dimension of the resultant array will be the length of the |
|
chain T. The leading dimensions will match size(x). |
|
|
|
This should be used in conjunction with the _save! method |
|
|
|
=# |
|
_allocate{S <: Number}(::S, T::Int) = Array(S, T) |
|
_allocate{S <: Number}(x::Vector{S}, T::Int) = Array(S, length(x), T) |
|
_allocate{S <: Number}(x::Matrix{S}, T::Int) = Array(S, size(x, 1), |
|
size(x, 2), T) |
|
|
|
_save!(d::Dict, nm::Symbol, x::Real, t::Int) = d[nm][t] = x |
|
_save!(d::Dict, nm::Symbol, x::Vector, t::Int) = d[nm][:, t] = x |
|
_save!(d::Dict, nm::Symbol, x::Matrix, t::Int) = d[nm][:, :, t] = x |
|
|
|
|
|
function _save_text!(io::IOStream, nm::Symbol, x::Real, t::Int) |
|
nothing |
|
end |
|
|
|
function _save_text!(d::Dict, nm::Symbol, x::Vector, t::Int) |
|
nothing |
|
end |
|
|
|
function _save_text!(d::Dict, nm::Symbol, x::Matrix, t::Int) |
|
nothing |
|
end |
|
|
|
function run_me(m::MCMCGibbsModel, r::Range=1:200) |
|
param_names = keys(m.curr_params) |
|
samples = Dict{Symbol, Array}() |
|
|
|
T = length(r) |
|
for nm in param_names |
|
samples[nm] = _allocate(m.curr_params[nm], T) |
|
end |
|
|
|
# Print 10 times during simulation |
|
print_checks = int([(1:10) * (last(r)/10)]) |
|
|
|
t = 1 |
|
for i=1:last(r) |
|
# update all blocks |
|
for blk=1:m.n_block |
|
m.block_funcs[blk](m) |
|
end |
|
|
|
# if we aren't thinning this time |
|
if i in r |
|
# extract new sample |
|
for nm in param_names |
|
_save!(samples, nm, m.curr_params[nm], t) |
|
end |
|
t += 1 |
|
end |
|
if i in print_checks |
|
println("Finished with $i of $(last(r))") |
|
end |
|
end |
|
samples |
|
|
|
end |
|
|
|
run_me(m::MCMCGibbsModel, t::Int) = run_me(m, 1:t) |
|
|
|
|
|
## ------------------------------------------------------- ## |
|
#- Running method to save intermediate results to hdf file -# |
|
## ------------------------------------------------------- ## |
|
|
|
""" |
|
functions to construct groups for HDF5 dataset. |
|
|
|
* f is file name |
|
* nm is dataset name at root level or with `group/name` syntax |
|
* T is integer for length of dataset |
|
* cs is chunksize |
|
""" |
|
|
|
function _allocate_hdf{S <: Number}(::S, f::HDF5File, nm::String, T::Int, |
|
cs::Int) |
|
d = d_create(f, nm, datatype(S), dataspace((T, )), "chunk", (cs)) |
|
a = Array(S, cs) |
|
return d, a |
|
end |
|
|
|
function _allocate_hdf{S <: Number}(x::Vector{S}, f::HDF5File, nm::String, |
|
T::Int, cs::Int) |
|
s1 = length(x) |
|
d = d_create(f, nm, datatype(S), dataspace(s1, T), "chunk", (s1, cs)) |
|
a = Array(S, s1, cs) |
|
return d, a |
|
end |
|
|
|
function _allocate_hdf{S <: Number}(x::Matrix{S}, f::HDF5File, nm::String, |
|
T::Int, cs::Int) |
|
s1, s2 = size(x) |
|
d = d_create(f, nm, datatype(S), dataspace(s1, s2, T), "chunk", |
|
(s1, s2, cs)) |
|
a = Array(S, s1, s2, cs) |
|
return d, a |
|
end |
|
|
|
|
|
function _save_hdf!(g::HDF5Dataset, x::Vector, t::UnitRange) |
|
g[t] = x |
|
nothing |
|
end |
|
|
|
function _save_hdf!(g::HDF5Dataset, x::Matrix, t::UnitRange) |
|
g[:, t] = x |
|
nothing |
|
end |
|
|
|
function _save_hdf!{T<:Real}(g::HDF5Dataset, x::Array{T, 3}, t::UnitRange) |
|
g[:, :, t] = x |
|
nothing |
|
end |
|
|
|
# crazy default for cs just says if chunk size is not specified, then we |
|
# should should try to have at least 20 chunks. However, we bound the |
|
# chunksize to be between (min(length(r), 400), 1000). |
|
function run_hdf(m::MCMCGibbsModel, r::Range=1:200; |
|
filename::String="cs2014_$(myid()).h5", |
|
cs::Int=0) |
|
if cs == 0 |
|
# set real default for cs |
|
cs = min(max(min(int(length(r)/20), 1000), 500), length(r)) |
|
end |
|
param_names = keys(m.curr_params) |
|
samples = Dict{Symbol, Array}() |
|
dsets = Dict{Symbol, HDF5Dataset}() |
|
f = h5open(filename, "w") |
|
|
|
T = length(r) |
|
for nm in param_names |
|
dsets[nm], samples[nm] = _allocate_hdf(m.curr_params[nm], f, |
|
string(nm), T, cs) |
|
end |
|
|
|
# Print 10 times during simulation |
|
print_skip = int(last(r)/10) |
|
|
|
# chunker keep track of which element of the chunk we are on |
|
chunker = 1 |
|
|
|
# fill range is what we will eventually pass to _save_hdf!. |
|
fill_range = 1:cs |
|
|
|
for i=1:last(r) |
|
# update all blocks |
|
for blk=1:m.n_block |
|
m.block_funcs[blk](m) |
|
end |
|
|
|
# if we aren't thinning this time |
|
if i in r |
|
# if our chunk still isn't totally full, grab new samples |
|
if chunker <= cs |
|
for nm in param_names |
|
_save!(samples, nm, m.curr_params[nm], chunker) |
|
end |
|
chunker += 1 |
|
end |
|
|
|
# check to see if chunk is full. |
|
if chunker == cs + 1 |
|
# if it is, save the data for this chunk |
|
for nm in param_names |
|
_save_hdf!(dsets[nm], samples[nm], fill_range) |
|
end |
|
|
|
# increment the fill_range |
|
fill_range += cs |
|
|
|
# reset the chunker |
|
chunker = 1 |
|
end |
|
|
|
end |
|
if i % print_skip == 0 |
|
println("Finished with $i of $(last(r))") |
|
end |
|
end |
|
close(f) |
|
nothing |
|
end |
|
|
|
run_hdf(m::MCMCGibbsModel, t::Int) = run_hdf(m, 1:t) |
|
|
|
|
|
## -------------------------------------------------------------- ## |
|
#- One more useful function to concat my parallely obtained data -# |
|
## -------------------------------------------------------------- ## |
|
""" |
|
This function will combine an arbitrary number of *similar* h5 files |
|
into one. The notion of similar is that the names, sizes, and types of |
|
all variables are the same in all files. |
|
|
|
This function "concatenates" along the last dimension. For example |
|
if you have N files and one dataset has dimensions (i, j) then the |
|
resultant dataset will have dimensions (i, Nj) |
|
""" |
|
function combine_data{XX <: String}(main_fname::String, f_names::Vector{XX}) |
|
f = h5open(main_fname, "w") |
|
|
|
N_files = length(f_names) |
|
|
|
# use this file for reference |
|
f2 = h5open(f_names[1], "r") |
|
|
|
# extract useful info about objects from first file |
|
nms = names(f2) |
|
info_dict = Dict() |
|
for nm in nms |
|
this_nm = Dict() |
|
sz = size(f2[nm]) |
|
this_nm["size"] = sz |
|
this_nm["ndims"] = length(sz) |
|
this_nm["eltype"] = eltype(f2[nm][ind2sub(sz, 1)...]) |
|
info_dict[nm] = this_nm |
|
end |
|
|
|
close(f2) |
|
|
|
# loop over all names and stitch together the different files |
|
for nm in nms |
|
sz = info_dict[nm]["size"] |
|
nobs = sz[end] # How many observations in this dataset? |
|
inds = 1:nobs # will allow us to fill in later |
|
|
|
# construct data type and space, then dataset |
|
dtype = datatype(info_dict[nm]["eltype"]) |
|
dspace = dataspace(tuple(sz[1:end-1]..., N_files*sz[end])) |
|
d = d_create(f, nm, dtype, dspace, "chunk", sz) |
|
|
|
for fnm in f_names |
|
this_f = h5open(fnm, "r") |
|
_save_hdf!(d, |
|
this_f[nm][[Colon() for i=1:info_dict[nm]["ndims"]]...], |
|
inds) |
|
inds += nobs |
|
end |
|
end |
|
close(f) |
|
end |