Skip to content

Instantly share code, notes, and snippets.

@stephenjbarr
Created August 30, 2012 04:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stephenjbarr/3522278 to your computer and use it in GitHub Desktop.
Save stephenjbarr/3522278 to your computer and use it in GitHub Desktop.
why does the MPI section fail
// -*- mode: C++; c-indent-level: 4; c-basic-offset: 4; tab-width: 8; -*-
//
// SJB - first shot at RInside and Eigen combined
//
// Copyright (C) 2012 Stephen J. Barr
//
// GPL'ed
#include <iostream>
#include <sstream>
#include <iomanip>
#include <fstream>
#include "mkl.h"
#include "math.h"
#include <vector>
#include <cmath>
#include <string>
#include <cstdlib>
#include <fcntl.h>
#include <sys/stat.h>
#include <ctype.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <getopt.h>
#include <map>
#include <utility>
//////////////////////////////
// INCLUDE THE GSL PACKAGES //
//////////////////////////////
#include <gsl/gsl_math.h>
#include <gsl/gsl_ieee_utils.h>
#include <gsl/gsl_multimin.h>
#include "mpi.h"
#include <RInside.h> // for the embedded R via RInside
#include <Rcpp.h>
#include <RcppEigen.h>
using namespace Rcpp;
using namespace Eigen;
using namespace std;
struct task_param {
int N_TASKS;
int START_ROW;
int END_ROW;
};
struct solver_parameters {
MatrixXd ua;
MatrixXd ub;
MatrixXd xmat;
MatrixXd ymat;
task_param TP;
int N_params;
};
double SolveEnergy(const gsl_vector *, void *);
double nll_mpi( const VectorXd& theta,
const MatrixXd& ua,
const MatrixXd& ub,
const MatrixXd& Xmat,
const MatrixXd& Ymat,
task_param TP);
int min(int a, int b) {
return ((a < b)? a : b);
}
double stdnormpdf(double x) {
return( (1.0 / sqrt(2.0 * PI))*exp(-0.5*x*x));
}
VectorXd dnorm(VectorXd x, VectorXd means, VectorXd sigmas) {
VectorXd scaler = (sigmas.array() * sqrt(2.0 * PI)).array().pow(-1.0);
VectorXd coefs = -0.5 * (( (x.array() - means.array())/sigmas.array() ).array().pow(2.0));
VectorXd dens = scaler.array() * coefs.array().exp();
return(dens);
}
VectorXd dnorm(VectorXd x, VectorXd means, double sigma) {
double scaler = 1.0/(sigma * sqrt(2.0 * PI));
VectorXd coefs = -0.5 * (( (x.array() - means.array())/sigma ).array().pow(2.0));
VectorXd dens = scaler * coefs.array().exp();
return(dens);
}
// given a row_i, and some matrices, calculate the negative log likelihood
// for the particular row_i (firm_i)
double nll_singleton(int row_i,
const VectorXd& theta,
const MatrixXd& ua,
const MatrixXd& ub,
const MatrixXd& Xmat,
const MatrixXd& Ymat) {
// NOTE: put some asserts here to check the agreement of the matrices
int n = ua.rows();
int nr = ua.cols();
double air, bir;
VectorXd muir;
double li = 0.0;
for(int r = 0; r < nr; ++r) {
air = theta(0) + theta(1)*ua(row_i, r);
bir = theta(2) + theta(3)*ua(row_i, r);
muir = bir * Xmat.row(row_i);
muir.array() += air;
VectorXd densities = dnorm(Ymat.row(row_i), muir, 1.0);
// cout << row_i << "," << r << ":: " << densities.transpose() << endl;
li += densities.prod();
}
// cout << "Row " << row_i << ": " << li << endl;
return log(li/double(nr));
}
double nll_mpi( const VectorXd& theta,
const MatrixXd& ua,
const MatrixXd& ub,
const MatrixXd& Xmat,
const MatrixXd& Ymat,
task_param TP
) {
const int root = 0;
int n = ua.rows();
int nr = ua.cols();
int rank;
int size;
int remaining_tasks = TP.N_TASKS;
rank = MPI::COMM_WORLD.Get_rank();
size = MPI::COMM_WORLD.Get_size();
int row_ary[2];
int * row_ary_ptr = row_ary;
int START_ROW = TP.START_ROW;
int END_ROW = TP.END_ROW;
int Nslaves = size - 1;
int cur_task_size;
double TOTAL_NLL_SUM[1];
double * recv_buffer = TOTAL_NLL_SUM;
double NLL_SUBTOTAL[1];
double * send_buffer = NLL_SUBTOTAL;
if(rank == 0) {
// SUBDIVIDE THE MATRICES INTO ROWS
for(int ii = 1; ii <= Nslaves; ++ii) {
cur_task_size = ( (remaining_tasks > (2 * (n/Nslaves)))) ?
n/Nslaves : remaining_tasks;
row_ary[START_ROW] = (ii-1)*(n/Nslaves);
row_ary[END_ROW] = ((ii-1)*(n/Nslaves) + cur_task_size - 1);
remaining_tasks -= cur_task_size;
// MPI_send to rank ii the start and end row
MPI_Send(row_ary_ptr, 2, MPI_INT, ii, 0, MPI::COMM_WORLD);
}
} else {
// RECEIVE ROW ASSIGNMENT, CALCULATE NLL FOR ROW GROUP
// RECEIVE AND OUTPUT ASSIGNMENT
MPI_Recv(row_ary_ptr, 2, MPI_INT, root, 0, MPI::COMM_WORLD, NULL);
cur_task_size = row_ary_ptr[END_ROW] - row_ary_ptr[START_ROW] + 1;
cout << "Workstation " << rank << " gets " << cur_task_size;
cout << " " << "Firms: " << row_ary_ptr[START_ROW] << " to " << row_ary_ptr[END_ROW] << endl;
// FOR EACH ROW, CALCULATE THE NLL
double total_sub_nll = 0.0;
double ll = 0.0;
for(int rii = row_ary_ptr[START_ROW];
rii <= row_ary_ptr[END_ROW];
++rii) {
ll = nll_singleton(rii, theta, ua, ub, Xmat, Ymat);
total_sub_nll += ll;
if(rii < 0) {
cout << "========================================" << endl;
cout << "rii: " << rii << endl;
cout << "ua: " << ua.row(rii) << endl;
cout << "ub: " << ub.row(rii) << endl;
cout << "Xmat: " << Xmat.row(rii) << endl;
cout << "Ymat: " << Ymat.row(rii) << endl;
}
// cout << "ll: " << ll << endl;
}
total_sub_nll *= -1.0;
cout << "Workstation " << rank << " calcs " << total_sub_nll << endl;
send_buffer[0] = total_sub_nll;
}
MPI_Reduce(send_buffer, recv_buffer, 1, MPI_DOUBLE,
MPI_SUM, root, MPI::COMM_WORLD);
if(rank == 0) {
cout << "TOTAL NLL: " << recv_buffer[0] << endl;
}
// MPI_Barrier(MPI::COMM_WORLD);
return recv_buffer[0];
}
double nll_nompi( const VectorXd& theta,
const MatrixXd& ua,
const MatrixXd& ub,
const MatrixXd& Xmat,
const MatrixXd& Ymat,
task_param TP
) {
const int root = 0;
int n = ua.rows();
int nr = ua.cols();
int rank;
int size;
int remaining_tasks = TP.N_TASKS;
// FOR EACH ROW, CALCULATE THE NLL
double total_sub_nll = 0.0;
double ll = 0.0;
for(int rii = 0;
rii < ua.rows();
++rii) {
ll = nll_singleton(rii, theta, ua, ub, Xmat, Ymat);
total_sub_nll += ll;
if(rii < 0) {
cout << "========================================" << endl;
cout << "rii: " << rii << endl;
cout << "ua: " << ua.row(rii) << endl;
cout << "ub: " << ub.row(rii) << endl;
cout << "Xmat: " << Xmat.row(rii) << endl;
cout << "Ymat: " << Ymat.row(rii) << endl;
}
// cout << "ll: " << ll << endl;
}
total_sub_nll *= -1.0;
return total_sub_nll;
}
// SolveEnergy
// the SolveEnergy function is a gsl_multimin_fminimizer
// compliant function that calls the negative log likelihood
// function
double SolveEnergy(const gsl_vector *v, void *param_ptr) {
solver_parameters *psl = (solver_parameters *) param_ptr;
VectorXd theta = VectorXd(psl->N_params);
for(int ii = 0; ii < psl->N_params; ii++) {
theta(ii) = gsl_vector_get(v, ii);
}
double obj_fn_val = nll_mpi(theta, psl->ua, psl->ub,
psl->xmat, psl->ymat,
psl->TP);
cout << "OBJ: " << obj_fn_val << endl;
return obj_fn_val;
}
// SolveEnergy
// the SolveEnergy function is a gsl_multimin_fminimizer
// compliant function that calls the negative log likelihood
// function
double SolveEnergy_nompi(const gsl_vector *v, void *param_ptr) {
solver_parameters *psl = (solver_parameters *) param_ptr;
VectorXd theta = VectorXd(psl->N_params);
for(int ii = 0; ii < psl->N_params; ii++) {
theta(ii) = gsl_vector_get(v, ii);
}
double obj_fn_val = nll_nompi(theta, psl->ua, psl->ub,
psl->xmat, psl->ymat,
psl->TP);
cout << "OBJ: " << obj_fn_val << endl;
return obj_fn_val;
}
VectorXd gsl_nelder(VectorXd initial, int max_iterations,
solver_parameters* solve_pars,
double TOLERANCE = 0.00001,
double STEPSIZE = 0.01) {
int MODEL_VALUE_COUNT = initial.rows();
cout << "gsl_nelder: " << "Initialized with count: " << MODEL_VALUE_COUNT << endl;
/* Set IEEE floating point mode from environment */
gsl_ieee_env_setup();
/* Get set up for Nelder Mead */
const gsl_multimin_fminimizer_type *T = gsl_multimin_fminimizer_nmsimplex2;
gsl_multimin_fminimizer *S = NULL;
gsl_vector *StartVec;
gsl_vector *StepVec;
gsl_multimin_function MinexFunc;
int Status;
double Size;
cout << "GSL: " << "Initializing Start Vec" << endl;
StartVec = gsl_vector_alloc(MODEL_VALUE_COUNT);
for (int i = 0; i < MODEL_VALUE_COUNT; i++)
{
gsl_vector_set(StartVec, i, initial[i]);
}
/* Set initial step size to 0.1 */
StepVec = gsl_vector_alloc(MODEL_VALUE_COUNT);
gsl_vector_set_all(StepVec, STEPSIZE);
/* Initialize method */
cout << "GSL: " << "Initializing Method" << endl;
MinexFunc.n = MODEL_VALUE_COUNT;
MinexFunc.f = &SolveEnergy;
MinexFunc.params = solve_pars;
cout << "GSL: " << "Starting minimizer" << endl;
S = gsl_multimin_fminimizer_alloc(T, MODEL_VALUE_COUNT);
cout << "GSL: passed alloc" << endl;
gsl_multimin_fminimizer_set(S, &MinexFunc, StartVec, StepVec);
cout << "GSL: passed set" << endl;
int Iter = 0;
do
{
cout << "ITER: " << Iter << endl;
Iter++;
Status = gsl_multimin_fminimizer_iterate(S);
if (Status)
break;
Size = gsl_multimin_fminimizer_size(S);
Status = gsl_multimin_test_size(Size, 1e-2);
if (Status == GSL_SUCCESS)
{
printf ("converged to minimum\n");
}
}
while (Status == GSL_CONTINUE && Iter < max_iterations);
VectorXd result = VectorXd(MODEL_VALUE_COUNT);
for(int jj = 0; jj < MODEL_VALUE_COUNT; jj++) {
result(jj) = gsl_vector_get(S->x, jj);
}
return result;
}
VectorXd gsl_nelder2(VectorXd initial, int max_iterations,
solver_parameters* solve_pars,
double TOLERANCE = 0.00001,
double STEPSIZE = 0.01) {
int MODEL_VALUE_COUNT = initial.rows();
cout << "gsl_nelder2: " << "Initialized with count: " << MODEL_VALUE_COUNT << endl;
/* Set IEEE floating point mode from environment */
gsl_ieee_env_setup();
/* Get set up for Nelder Mead */
const gsl_multimin_fminimizer_type *T = gsl_multimin_fminimizer_nmsimplex2;
gsl_multimin_fminimizer *S = NULL;
gsl_vector *StartVec;
gsl_vector *StepVec;
gsl_multimin_function MinexFunc;
int Status;
double Size;
cout << "GSL: " << "Initializing Start Vec" << endl;
StartVec = gsl_vector_alloc(MODEL_VALUE_COUNT);
for (int i = 0; i < MODEL_VALUE_COUNT; i++)
{
gsl_vector_set(StartVec, i, initial[i]);
}
/* Set initial step size to 0.1 */
StepVec = gsl_vector_alloc(MODEL_VALUE_COUNT);
gsl_vector_set_all(StepVec, STEPSIZE);
/* Initialize method */
cout << "GSL: " << "Initializing Method" << endl;
MinexFunc.n = MODEL_VALUE_COUNT;
MinexFunc.f = &SolveEnergy_nompi;
MinexFunc.params = solve_pars;
cout << "GSL: " << "Starting minimizer" << endl;
S = gsl_multimin_fminimizer_alloc(T, MODEL_VALUE_COUNT);
cout << "GSL: passed alloc" << endl;
gsl_multimin_fminimizer_set(S, &MinexFunc, StartVec, StepVec);
cout << "GSL: passed set" << endl;
int Iter = 0;
do
{
cout << "ITER: " << Iter << endl;
Iter++;
Status = gsl_multimin_fminimizer_iterate(S);
if (Status)
break;
Size = gsl_multimin_fminimizer_size(S);
Status = gsl_multimin_test_size(Size, 1e-2);
if (Status == GSL_SUCCESS)
{
printf ("converged to minimum\n");
}
}
while (Status == GSL_CONTINUE && Iter < max_iterations);
VectorXd result = VectorXd(MODEL_VALUE_COUNT);
for(int jj = 0; jj < MODEL_VALUE_COUNT; jj++) {
result(jj) = gsl_vector_get(S->x, jj);
}
return result;
}
int main(int argc, char *argv[]) {
const int N_TRU_PARAMS = 5;
const int n = 200;
const int t = 10;
const int nr = 5;
RInside R(argc, argv); // create an embedded R instance
stringstream ss;
ss << "n = " << n << "; t = " << t << ";" << " nr = " << nr << ";";
cout << ss.str() << endl;
R.parseEval(ss.str());
VectorXd tru = VectorXd(N_TRU_PARAMS);
tru << 2,1,-2,1,1;
// R.parseEval("n = 100;");
// R.parseEval("t = 100;");
R.parseEval("set.seed(123)");
string cmdstr = "tru = c(2,1,-2,1,1);"
"set.seed(123);"
"a = rnorm(n,tru[1],tru[2]);"
"b = rnorm(n,tru[3],tru[4]);"
// Make some data lists
"x = NULL;"
"y = NULL;"
// Generate some data for each firm
"for(i in 1:n) {"
" x[[i]] = rnorm(t,1,1);"
" y[[i]] = a[i]+b[i]*x[[i]]+ rnorm(t)*tru[5];"
"}";
R.parseEval(cmdstr);
SEXP Xmat_sexp = R.parseEval("Xmat = do.call(rbind, x)");
const Map<MatrixXd> Xmat(as<Map<MatrixXd> >(Xmat_sexp));
SEXP Ymat_sexp = R.parseEval("Ymat = do.call(rbind, y)");
const Map<MatrixXd> Ymat(as<Map<MatrixXd> >(Ymat_sexp));
cout << "X: " << Xmat.rows() << " x " << Xmat.cols() << endl;
cout << "Y: " << Ymat.rows() << " x " << Ymat.cols() << endl;
// Make matrices ua and ub
SEXP ua_sexp = R.parseEval( "ua = matrix(rnorm(nr*n),n,nr)");
const Map<MatrixXd> ua(as<Map<MatrixXd> >(ua_sexp));
SEXP ub_sexp = R.parseEval( "ub = matrix(rnorm(nr*n),n,nr)");
const Map<MatrixXd> ub(as<Map<MatrixXd> >(ub_sexp));
cmdstr = "track = 0;"
"Y = unlist(y);"
"X = unlist(x);"
"start = lm(Y~X)$coeff;"
"start =c(start[1],0,start[2],0);";
R.parseEval(cmdstr);
SEXP start_sexp = R.parseEval("start");
const Map<VectorXd> start_vector(as<Map<VectorXd> >(start_sexp));
cout << "Starting point: " << start_vector.transpose() << endl;
////////////////////////////////////////////////////////////
// THE FOLLOWING DISCUSSION IS DEPRECATED. USING GSL
// ========================================
// EXPOSE THE NLL FUNCTION
//
// We want to use R's optimization routine to solve this. In order
// to do this, we want to expose the C++ negative log likelihood (nll)
// function to R. To do this, we use the command
// Rcpp::InternalFunction( &functionName )
//
// For a simple example of this, see:
// RInside/inst/examples/standard/rinside_sample9.cpp
// R["nll"] = Rcpp::InternalFunction( &nll_singleton );
////////////////////////////////////////////////////////////
// THERE ARE TWO PAIRS OF FUNCTIONS, smle_init and smle_nll_mpi
// smle_init: distributes, using mpi broadcast, the necessary
// data to all machines.
//
// smle_nll_mpi: assuming smle_init has been successfully completed
// compute the negative log likelihood using MPI
//
// for now, these functions will be written out in the code
// MPI INITIALIZATION
int rank, size;
const int root = 0;
cout << "Before init" << endl;
int remaining_tasks = n;
const int START_ROW = 0;
const int END_ROW = 1;
task_param TASK_PARAM;
TASK_PARAM.N_TASKS = n;
TASK_PARAM.START_ROW = START_ROW;
TASK_PARAM.END_ROW = END_ROW;
solver_parameters SP;
SP.ua = ua;
SP.ub = ub;
SP.xmat = Xmat;
SP.ymat = Ymat;
SP.TP = TASK_PARAM;
SP.N_params = (N_TRU_PARAMS-1); // -1 since we are not using the last one
nll_nompi(start_vector, ua, ub, Xmat, Ymat, TASK_PARAM);
/////////////////////////////
// TESTING GSL WITHOUT MPI //
/////////////////////////////
// gsl_vector *StartVec;
// StartVec = gsl_vector_alloc(SP.N_params);
// int i;
// for(i = 0; i < SP.N_params; ++i) {
// gsl_vector_set(StartVec, i, start_vector(i));
// cout << i << ": "<< gsl_vector_get(StartVec,i) << endl;
// }
// cout << "SolveEnergy_nompi: " << SolveEnergy_nompi(StartVec, &SP) << endl;
// // BEGIN THE OPTIMIZER
// VectorXd output = gsl_nelder2(start_vector, 200, &SP,
// 0.00021209,
// 0.202778);
// cout << "OUTPUT: " << output << endl;
// THIS WORKS
////////////////////////////////////////////////////////////
/////////////////////
// THE MPI SECTION //
/////////////////////
MPI::Init();
// Next, use GSL to optimize over this
// nll_mpi(start_vector, ua, ub, Xmat, Ymat, TASK_PARAM);
rank = MPI::COMM_WORLD.Get_rank();
if(rank == root) {
//////////////////// Populate the StartVec
gsl_vector *StartVec;
StartVec = gsl_vector_alloc(SP.N_params);
int i;
for(i = 0; i < SP.N_params; ++i) {
gsl_vector_set(StartVec, i, start_vector(i));
cout << i << ": "<< gsl_vector_get(StartVec,i) << endl;
}
// cout << "OUTPUT: " << SolveEnergy(StartVec, &SP) << endl;
// BEGIN THE OPTIMIZER
VectorXd output = gsl_nelder(start_vector, 200, &SP,
0.00021209,
0.202778);
cout << "OUTPUT: " << output << endl;
}
MPI::Finalize();
// END MPI SECTION
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment