Skip to content

Instantly share code, notes, and snippets.

@mschubert
Last active August 29, 2015 13:57
Show Gist options
  • Save mschubert/9890462 to your computer and use it in GitHub Desktop.
Save mschubert/9890462 to your computer and use it in GitHub Desktop.
Wrapper for the BatchJobs library that simplifies the interface and performs more checks
library(stringr)
library(BatchJobs)
library(plyr)
# Rationale
# This script uses BatchJobs to run functions either locally, on multiple cores, or LSF,
# depending on your BatchJobs configuration. It has a simpler interface, does more error
# checking than the library itself, and is able to queue different function calls. The
# function supplied *MUST* be self-sufficient, i.e. load libraries and scripts.
# BatchJobs on the EBI cluster is already set up when using the gentoo prefix.
#
# Usage
# * Q() : create a new registry with that vectorises a function call and optionally runs it
# * Qrun() : run all registries in the current working directory
# * Qget() : extract the results from the registry and returns them
# * Qclean(): delete all registries in the current working directory
# * Qregs() : list all registries in the current working directory
#
# Examples
# > s = function(x) x
# > Q(s, x=c(1:3), get=T)
# returns list(1,2,3)
#
# > t = function(x) sum(x)
# > a = matrix(3:6, nrow=2)
# > Q(t, a)
# > Qrun()
# > Qget()
# splits a by columns, sums each column, and returns list(7, 11)
#
# TODO list
# * handle failed jobs? (e.g.: save layout to registry dir to rerun failed jobs) [rerun option?]
# * is there any reason to Q() w/o submitting the job? [chunking?]
# * is there any reason to separate Qrun() and Qget()? [rerunning failed jobs]
# * Qget(): warn when not all jobs are returned
# * Qregs(): possible that creation time does not follow call time?
.QLocalRegistries = list()
Q = function(` fun`, ..., more.args=list(), name=NULL, run=T, get=F,
split.array.by=NA, expand.grid=F, grid.sep=":", seed=123) {
# Creates a new registry with that vectorises a function call and optionally runs it
# ` fun` : the function to call
# ... : arguments to vectorise over
# more.args : arguments not to vectorise over
# name : the name of the function call if more than one are submitted
# run : execute the function, don't just queue
# get : returns the result of the run; implies run=T
# split.array.by: how to split matrices/arrays in ... (default: last dimension)
# expand.grid : do every combination of arguments to vectorise over
# grid.sep : separator to use when assembling names from expand.grid
# seed : random seed for the function to run
# summarise arguments
l. = list(...)
fun = match.fun(` fun`)
funargs = formals(fun)
required = names(funargs)[unlist(lapply(funargs, function(f) class(f)=='name'))]
provided= names(c(l., more.args))
# perform checks that BatchJobs doesn't do
if (is.null(name) && save)
stop("Save option requires name argument to be set")
if ('reg' %in% provided || 'fun' %in% provided)
stop("'reg' and 'fun' are reserved and thus not allowed as argument to ` fun`")
if (any(grepl("^ ", provided)))
stop("Arguments starting with space are not allowed")
if (expand.grid && length(l.) == 1)
stop("Can not expand.grid on one vector")
if (length(provided) > 1) {
if (any(nchar(provided) == 0))
stop("All arguments that will be provided to function must be named")
sdiff = unlist(setdiff(required, provided))
if (length(sdiff) > 0 && sdiff != '...')
stop(paste("Argument required but not provided:", paste(sdiff, collapse=" ")))
}
sdiff = unlist(setdiff(provided, names(funargs)))
if (length(sdiff) > 0 && ! '...' %in% names(funargs))
stop(paste("Argument provided by not accepted by function:", paste(sdiff, collapse=" ")))
dups = duplicated(provided)
if (any(dups))
stop(paste("Argument duplicated:", paste(provided[[dups]], collapse=" ")))
# convert matrices to lists so they can be vectorised over
split_mat = function(X) {
if (is.array(X) && length(dim(X)) > 1) {
if (is.na(split.array.by))
setNames(alply(X, length(dim(X))), dimnames(X)[[length(dim(X))]])
else
setNames(alply(X, split.array.by), dimnames(X)[[split.array.by]])
} else
X
}
l. = lapply(l., split_mat)
# name every vector so we can identify them afterwards
ln = lapply(l., names)
lnFull = lapply(1:length(ln), function(i)
if (is.null(ln[[i]])) 1:length(l.[[i]])
else ln[[i]])
tmpdir = tempfile(pattern="QRtmp", tmpdir=".")
tmpdir = substr(tmpdir, 3, nchar(tmpdir))
reg = makeRegistry(id=tmpdir, file.dir=tmpdir, seed=seed)
# fill the registry with function calls, save names as well
if (expand.grid)
do.call(batchExpandGrid, c(list(reg=reg, fun=fun, more.args=more.args), l.))
else
do.call(batchMap, c(list(reg=reg, fun=fun, more.args=more.args), l.))
setwd(tmpdir)
if (expand.grid || is.null(unlist(ln)))
resultNames = apply(expand.grid(lnFull), 1, function(x) paste(x,collapse=grid.sep))
else
resultNames = as.matrix(apply(do.call(cbind, ln), 1, unique))
save(resultNames, name, file="names.RData")
setwd('..')
if (is.null(name))
lidx = length(.QLocalRegistries + 1)
else
lidx = name
assign('.QLocalRegistries', c(.QLocalRegistries, setNames(list(reg), name)),
envir=parent.env(environment()))
if (run)
Qrun(regs=reg)
if (get)
Qget(regs=reg)
}
Qrun = function(n.chunks=NULL, chunk.size=NULL, shuffle=T, regs=Qregs()) {
# Runs all registries in the current working directory
# n.chunks : number of chunks (cores, LSF jobs) to split each registry into
# chunk.size: number of calls to put into one core/LSF job (do not use with n.chunks)
# shuffle : if chunking, shuffle the order of calls
if (!is.null(n.chunks) && !is.null(chunk.size))
stop("Can not take both n.chunks and chunk.size")
if (class(regs) == 'Registry')
regs = list(regs)
for (reg in regs) {
ids = getJobIds(reg)
if (!is.null(n.chunks))
ids = chunk(ids, n.chunks=n.chunks, shuffle=shuffle)
if (!is.null(chunk.size))
ids = chunk(ids, chunk.size=chunk.size, shuffle=shuffle)
submitJobs(reg, ids)
}
}
Qget = function(clean=T, regs=Qregs()) {
# Extracts the results from the registry and returns them
# regs : list of registries to include
# clean : delete the registry when done
# @return: a list of results of the function called with different arguments
if (class(regs) == 'Registry')
regs = list(regs)
getResult = function(reg) {
waitForJobs(reg)
result = reduceResultsList(reg, fun=function(job, res) res)
load(file.path(reg$id, 'names.RData')) # resultNames
if (clean)
unlink(reg$file.dir, recursive=T)
setNames(result, resultNames[as.integer(names(result))])
}
setNames(lapply(regs, getResult), names(regs))
}
Qclean = function(regs=Qregs()) {
# Deletes all registries in the current working directory
# regs: list of registries to include
if (class(regs) == 'Registry')
regs = list(regs)
for (reg in regs)
unlink(reg$file.dir, recursive=T)
}
Qregs = function(name=".*", directory="QRtmp[0-9a-zA-Z]+", local=T) {
# Lists all registries in the current working directory
# name : regular expression specifying the registry name
# directory: regular expression specifying the directories to look for registries
# local : only return registries created in this session
# @return : a list of registry objects
if (local)
return(.QLocalRegistries)
regdirs = list.files(pattern=directory, include.dirs=T)
if (length(regdirs) == 0) return(list())
regfun = function(path) list.files(path=path, pattern="^registry.RData$", full.names=T)
details = file.info(sapply(regdirs, regfun))
regfiles = rownames(details[with(details, order(as.POSIXct(mtime))),])
getRegistry = function(rdir) {
setwd(rdir)
load('registry.RData')
load('names.RData')
setwd('..')
list(name, reg)
}
regs = lapply(regdirs, getRegistry)
regs = setNames(lapply(regs, function(x) x[[2]]), sapply(regs, function(x) x[[1]]))
regs[grepl(name, names(regs))]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment