Skip to content

Instantly share code, notes, and snippets.

@DarwinAwardWinner
Last active December 24, 2015 08:39
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DarwinAwardWinner/6771922 to your computer and use it in GitHub Desktop.
Save DarwinAwardWinner/6771922 to your computer and use it in GitHub Desktop.
Proof of concept for a wrapper for ShortRead::FastqStreamer that preloads the next set of reads while the calling code is processing the previous set.
library(ShortRead)
library(parallel)
tsmsg <- function(...) message(format(Sys.time(), "%Y-%m-%d %H:%M:%OS6"), ": ", ...)
## Like readChild but returns NULL when child is finished, and
## automatically unserializes results.
readChildSafe <- function(child) {
res <- parallel:::readChild(child)
if (is.integer(res) || is.null(res)) {
return(NULL)
}
return(unserialize(res))
}
.PreloadProcWrapper <- setClass("PreloadProcWrapper", slots=c(proc="ANY"))
setValidity("PreloadProcWrapper", function(object) {
if (is(object@proc, "process"))
TRUE
else
"proc must be a process"
})
PreloadProcWrapper <- function(proc) .PreloadProcWrapper(proc=proc)
setMethod("yield", signature=c(x="PreloadProcWrapper"), definition = function(x, ...) {
parallel:::sendChildStdin(x@proc, "\n")
readChildSafe(x@proc)
})
## returns a function that works like the constructor function but
## transparently preloads.
preloadWrapper <- function(constructor, yieldMethod) {
function(...) {
args <- list(...)
proc <- mcparallel({
stream <- do.call(constructor, args)
nyield <- 0
while (length(nextYield <- yieldMethod(stream))) {
nyield <- nyield + 1
tsmsg("CHILD: Preloaded yield ", nyield, ".")
## Wait for the next request
readLines(con=stdin(), n=1)
parallel:::sendMaster(nextYield)
tsmsg("CHILD: Sent yield ", nyield, ".")
}
NULL
})
PreloadProcWrapper(proc)
}
}
PreloadingFastqStreamer <- preloadWrapper(FastqStreamer, yield)
{
sp <- SolexaPath(system.file('extdata', package='ShortRead'))
fl <- file.path(analysisPath(sp), "s_1_sequence.txt")
fqs <- PreloadingFastqStreamer(fl, n=50)
Sys.sleep(1)
nyield <- 0
tsmsg("MAIN: Requesting yield ", nyield+1, ".")
while(length(x <- yield(fqs))) {
nyield <- nyield + 1
tsmsg("MAIN: Received yield ", nyield, ".")
# Long calculation here
Sys.sleep(1)
tsmsg("MAIN: Processed yield ", nyield, ".")
tsmsg("MAIN: Requesting yield ", nyield+1, ".")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment