Skip to content

Instantly share code, notes, and snippets.

@mushkevych
Created May 11, 2012 20:35
Show Gist options
  • Save mushkevych/2662252 to your computer and use it in GitHub Desktop.
Save mushkevych/2662252 to your computer and use it in GitHub Desktop.
Exemplary R Reducer to illustrate basic principles of running R from Hadoop mapreduce
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.log4j.Logger;
import org.rosuda.JRI.REXP;
import org.rosuda.JRI.RMainLoopCallbacks;
import org.rosuda.JRI.Rengine;
import java.io.IOException;
/**
* @author Bohdan Mushkevych
* date: May 2012
*/
public class ExemplaryRReducer extends AbstractTableReducer<ImmutableBytesWritable, Writable> {
private static Logger log = Logger.getLogger(ExemplaryRReducer.class);
protected Rengine re;
static class LoggingConsole implements RMainLoopCallbacks {
private Logger log;
LoggingConsole(Logger log) {
this.log = log;
}
public void rWriteConsole(Rengine re, String text, int oType) {
log.info("*** R: callback rWriteConsole(" + text + ")");
}
public void rBusy(Rengine re, int which) {
log.info("*** R: callback rBusy(" + which + ")");
}
public void rShowMessage(Rengine re, String message) {
log.info("*** R: callback rShowMessage \"" + message + "\"");
}
public String rReadConsole(Rengine re, String prompt, int addToHistory) {
return null;
}
public String rChooseFile(Rengine re, int newFile) {
return null;
}
public void rFlushConsole(Rengine re) {
}
public void rLoadHistory(Rengine re, String filename) {
}
public void rSaveHistory(Rengine re, String filename) {
}
}
/**
* R Engine initialization is in a different method than *setup* to enable Unit Testing
* @param runMainLoop if set to <code>true</code> the the event loop will be started as soon as possible,
* otherwise no event loop is started. Running loop requires <code>initialCallbacks</code> to be set correspondingly as well.
*/
public void initR(boolean runMainLoop) {
// Call R and perform coefficient computing
// just making sure we have the right version of everything
if (!Rengine.versionCheck()) {
throw new IllegalStateException("*** R: version mismatch - Java files don't match R library version.");
}
// --vanilla Combine --no-save, --no-restore, --no-site-file, --no-init-file and --no-environ
// --slave Make R run as quietly as possible
// for more details run <code>R --help</code> from command line
re = new Rengine(new String[]{"--vanilla", "--slave"}, runMainLoop, new LoggingConsole(log));
// the engine creates R is a new thread, so we should wait until it's ready
if (!re.waitForR()) {
throw new IllegalStateException("*** R: cannot start the engine.");
}
try {
// check if "reshape" package is installed
re.eval("is.installed <- function(mypkg) is.element(mypkg, installed.packages()[,1])");
REXP isInstalled = re.eval("is.installed(\"reshape\")");
if (isInstalled.asBool().isFALSE()) {
log.info("*** R: reshape package is missing. Installing locally.");
// install "reshape" package if this is needed
re.eval("install.packages(\"reshape\", repos=\"http://cran.stat.sfu.ca/\")");
} else {
log.info("*** R: reshape package is installed. Proceeding.");
}
} catch (Exception e) {
log.error("*** Exception during R initialization: ", e);
}
try {
// load "reshape" package
re.eval("library(reshape)");
} catch (Exception e) {
log.error("*** Exception while loading reshape package: ", e);
}
}
@Override
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
initR(false);
}
/**
* method performs R computations
*/
public void performRComputations() {
try {
// clear workspace before new processing round
re.eval("rm(list=ls())");
// perform some useful computations
re.eval("N <- SOMETHING_USEFUL");
} catch (Exception e) {
log.error("*** Exception on R stage: ", e);
}
}
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Writable> values, Context context) throws IOException, InterruptedException {
// aggregate records
for (Writable value : values) {
Result singleResult = (Result) value;
// do something useful with singleResult
// perform R computations
performRComputations();
// place computation results into HBase
// remember to configure Put object properly
Put put = new Put();
context.write(key, put);
}
}
/**
* R Engine closure is in a separate method than *cleanup* to enable Unit Testing
*/
public void cleanR() {
if (re == null) {
return;
}
re.end();
if (!re.waitForR()) {
log.info("*** R: engine is stopped.");
} else {
log.info("*** R: engine turned to zombie.");
}
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
cleanR();
super.cleanup(context);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment