Last active
August 29, 2015 13:57
-
-
Save mschubert/9890462 to your computer and use it in GitHub Desktop.
Wrapper for the BatchJobs library that simplifies the interface and performs more checks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(stringr) | |
library(BatchJobs) | |
library(plyr) | |
# Rationale | |
# This script uses BatchJobs to run functions either locally, on multiple cores, or LSF, | |
# depending on your BatchJobs configuration. It has a simpler interface, does more error | |
# checking than the library itself, and is able to queue different function calls. The | |
# function supplied *MUST* be self-sufficient, i.e. load libraries and scripts. | |
# BatchJobs on the EBI cluster is already set up when using the gentoo prefix. | |
# | |
# Usage | |
# * Q() : create a new registry with that vectorises a function call and optionally runs it | |
# * Qrun() : run all registries in the current working directory | |
# * Qget() : extract the results from the registry and returns them | |
# * Qclean(): delete all registries in the current working directory | |
# * Qregs() : list all registries in the current working directory | |
# | |
# Examples | |
# > s = function(x) x | |
# > Q(s, x=c(1:3), get=T) | |
# returns list(1,2,3) | |
# | |
# > t = function(x) sum(x) | |
# > a = matrix(3:6, nrow=2) | |
# > Q(t, a) | |
# > Qrun() | |
# > Qget() | |
# splits a by columns, sums each column, and returns list(7, 11) | |
# | |
# TODO list | |
# * handle failed jobs? (e.g.: save layout to registry dir to rerun failed jobs) [rerun option?] | |
# * is there any reason to Q() w/o submitting the job? [chunking?] | |
# * is there any reason to separate Qrun() and Qget()? [rerunning failed jobs] | |
# * Qget(): warn when not all jobs are returned | |
# * Qregs(): possible that creation time does not follow call time? | |
.QLocalRegistries = list() | |
Q = function(` fun`, ..., more.args=list(), name=NULL, run=T, get=F, | |
split.array.by=NA, expand.grid=F, grid.sep=":", seed=123) { | |
# Creates a new registry with that vectorises a function call and optionally runs it | |
# ` fun` : the function to call | |
# ... : arguments to vectorise over | |
# more.args : arguments not to vectorise over | |
# name : the name of the function call if more than one are submitted | |
# run : execute the function, don't just queue | |
# get : returns the result of the run; implies run=T | |
# split.array.by: how to split matrices/arrays in ... (default: last dimension) | |
# expand.grid : do every combination of arguments to vectorise over | |
# grid.sep : separator to use when assembling names from expand.grid | |
# seed : random seed for the function to run | |
# summarise arguments | |
l. = list(...) | |
fun = match.fun(` fun`) | |
funargs = formals(fun) | |
required = names(funargs)[unlist(lapply(funargs, function(f) class(f)=='name'))] | |
provided= names(c(l., more.args)) | |
# perform checks that BatchJobs doesn't do | |
if (is.null(name) && save) | |
stop("Save option requires name argument to be set") | |
if ('reg' %in% provided || 'fun' %in% provided) | |
stop("'reg' and 'fun' are reserved and thus not allowed as argument to ` fun`") | |
if (any(grepl("^ ", provided))) | |
stop("Arguments starting with space are not allowed") | |
if (expand.grid && length(l.) == 1) | |
stop("Can not expand.grid on one vector") | |
if (length(provided) > 1) { | |
if (any(nchar(provided) == 0)) | |
stop("All arguments that will be provided to function must be named") | |
sdiff = unlist(setdiff(required, provided)) | |
if (length(sdiff) > 0 && sdiff != '...') | |
stop(paste("Argument required but not provided:", paste(sdiff, collapse=" "))) | |
} | |
sdiff = unlist(setdiff(provided, names(funargs))) | |
if (length(sdiff) > 0 && ! '...' %in% names(funargs)) | |
stop(paste("Argument provided by not accepted by function:", paste(sdiff, collapse=" "))) | |
dups = duplicated(provided) | |
if (any(dups)) | |
stop(paste("Argument duplicated:", paste(provided[[dups]], collapse=" "))) | |
# convert matrices to lists so they can be vectorised over | |
split_mat = function(X) { | |
if (is.array(X) && length(dim(X)) > 1) { | |
if (is.na(split.array.by)) | |
setNames(alply(X, length(dim(X))), dimnames(X)[[length(dim(X))]]) | |
else | |
setNames(alply(X, split.array.by), dimnames(X)[[split.array.by]]) | |
} else | |
X | |
} | |
l. = lapply(l., split_mat) | |
# name every vector so we can identify them afterwards | |
ln = lapply(l., names) | |
lnFull = lapply(1:length(ln), function(i) | |
if (is.null(ln[[i]])) 1:length(l.[[i]]) | |
else ln[[i]]) | |
tmpdir = tempfile(pattern="QRtmp", tmpdir=".") | |
tmpdir = substr(tmpdir, 3, nchar(tmpdir)) | |
reg = makeRegistry(id=tmpdir, file.dir=tmpdir, seed=seed) | |
# fill the registry with function calls, save names as well | |
if (expand.grid) | |
do.call(batchExpandGrid, c(list(reg=reg, fun=fun, more.args=more.args), l.)) | |
else | |
do.call(batchMap, c(list(reg=reg, fun=fun, more.args=more.args), l.)) | |
setwd(tmpdir) | |
if (expand.grid || is.null(unlist(ln))) | |
resultNames = apply(expand.grid(lnFull), 1, function(x) paste(x,collapse=grid.sep)) | |
else | |
resultNames = as.matrix(apply(do.call(cbind, ln), 1, unique)) | |
save(resultNames, name, file="names.RData") | |
setwd('..') | |
if (is.null(name)) | |
lidx = length(.QLocalRegistries + 1) | |
else | |
lidx = name | |
assign('.QLocalRegistries', c(.QLocalRegistries, setNames(list(reg), name)), | |
envir=parent.env(environment())) | |
if (run) | |
Qrun(regs=reg) | |
if (get) | |
Qget(regs=reg) | |
} | |
Qrun = function(n.chunks=NULL, chunk.size=NULL, shuffle=T, regs=Qregs()) { | |
# Runs all registries in the current working directory | |
# n.chunks : number of chunks (cores, LSF jobs) to split each registry into | |
# chunk.size: number of calls to put into one core/LSF job (do not use with n.chunks) | |
# shuffle : if chunking, shuffle the order of calls | |
if (!is.null(n.chunks) && !is.null(chunk.size)) | |
stop("Can not take both n.chunks and chunk.size") | |
if (class(regs) == 'Registry') | |
regs = list(regs) | |
for (reg in regs) { | |
ids = getJobIds(reg) | |
if (!is.null(n.chunks)) | |
ids = chunk(ids, n.chunks=n.chunks, shuffle=shuffle) | |
if (!is.null(chunk.size)) | |
ids = chunk(ids, chunk.size=chunk.size, shuffle=shuffle) | |
submitJobs(reg, ids) | |
} | |
} | |
Qget = function(clean=T, regs=Qregs()) { | |
# Extracts the results from the registry and returns them | |
# regs : list of registries to include | |
# clean : delete the registry when done | |
# @return: a list of results of the function called with different arguments | |
if (class(regs) == 'Registry') | |
regs = list(regs) | |
getResult = function(reg) { | |
waitForJobs(reg) | |
result = reduceResultsList(reg, fun=function(job, res) res) | |
load(file.path(reg$id, 'names.RData')) # resultNames | |
if (clean) | |
unlink(reg$file.dir, recursive=T) | |
setNames(result, resultNames[as.integer(names(result))]) | |
} | |
setNames(lapply(regs, getResult), names(regs)) | |
} | |
Qclean = function(regs=Qregs()) { | |
# Deletes all registries in the current working directory | |
# regs: list of registries to include | |
if (class(regs) == 'Registry') | |
regs = list(regs) | |
for (reg in regs) | |
unlink(reg$file.dir, recursive=T) | |
} | |
Qregs = function(name=".*", directory="QRtmp[0-9a-zA-Z]+", local=T) { | |
# Lists all registries in the current working directory | |
# name : regular expression specifying the registry name | |
# directory: regular expression specifying the directories to look for registries | |
# local : only return registries created in this session | |
# @return : a list of registry objects | |
if (local) | |
return(.QLocalRegistries) | |
regdirs = list.files(pattern=directory, include.dirs=T) | |
if (length(regdirs) == 0) return(list()) | |
regfun = function(path) list.files(path=path, pattern="^registry.RData$", full.names=T) | |
details = file.info(sapply(regdirs, regfun)) | |
regfiles = rownames(details[with(details, order(as.POSIXct(mtime))),]) | |
getRegistry = function(rdir) { | |
setwd(rdir) | |
load('registry.RData') | |
load('names.RData') | |
setwd('..') | |
list(name, reg) | |
} | |
regs = lapply(regdirs, getRegistry) | |
regs = setNames(lapply(regs, function(x) x[[2]]), sapply(regs, function(x) x[[1]])) | |
regs[grepl(name, names(regs))] | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment