Skip to content

Instantly share code, notes, and snippets.

@richfitz
Last active May 24, 2021 14:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save richfitz/cd8da85da7ffc20e31c4 to your computer and use it in GitHub Desktop.
Save richfitz/cd8da85da7ffc20e31c4 to your computer and use it in GitHub Desktop.
mclapply -> rrqlapply
use <- function(x, default){
if(is.null(x))
default
else
x
}
sim_fkf <- function(fit){
n <- fit[["n"]]
dt <- fit[["dt"]]
HHt <- fit[["HHt"]]
Tt <- use(fit[["Tt"]], 1)
GGt <- use(fit[["GGt"]], 0)
a0 <- fit[["a0"]]
ct <- 0
Zt <- 1
a <- numeric(n)
y <- numeric(n)
eta <- rnorm(n, dt, sqrt(HHt))
epsilon <- rnorm(n, ct, sqrt(GGt))
a[1] <- a0
for(t in 1:(n-1)){
a[t+1] <- Tt * a[t] + eta[t]
y[t] <- Zt * a[t] + epsilon[t]
}
y[n] <- Zt * a[n] + epsilon[n]
y
}
## ------------------------------------------------------------------------
fit_dd <- function(y,
init = c(dt = mean(y), HHt = log(var(y)/2), GGt = log(var(y)/2)),
...){
o <- optim(init,
fn = function(par, ...)
-fkf(dt = matrix(par[1]), HHt = matrix(exp(par[2])),
GGt = matrix(exp(par[3])), ...)$logLik,
Tt = matrix(0.8), a0 = y[1], P0 = matrix(10),
ct = matrix(0), Zt = matrix(1), yt = rbind(y),
check.input = FALSE, ...)
o$par[["HHt"]] <- exp(o$par[["HHt"]])
o$par[["GGt"]] <- exp(o$par[["GGt"]])
c(o, list(a0 = y[1], n = length(y)))
}
## ------------------------------------------------------------------------
sim_di <- function(df) data.frame(logN = sim_fkf(robust_fit("ssrw", df$logN, N = 3)))
sim_dd <- function(df) data.frame(logN = sim_fkf(robust_fit("dd", df$logN, N = 3)))

Installation requires many many packages:

devtools::install_github("ropensci/RedisAPI")
# devtools::install_github("richfitz/RedisHeartbeat") # optional, requires libhiredis to be found
devtools::install_github("gaborcsardi/crayon")
devtools::install_github("gaborcsardi/progress")
devtools::install_github("traitecoevo/rrqueue")

You'll need redis server running!

In one R session, work through the main.R file. It'll block once you get to the last line.

In another R session in the same directory start a worker instance with:

rrqueue::worker("carl")

which will start processing jobs and you'll see the progress bar move. You can start other workers in the same directory with the same command. There is support for spawning them out of the current session but I get some weirdnesses with R's interrupt handling with that. There's also a shell script you can install to make it easy to launch them (rrqueue:::install_scripts).

The plus side is that the worker pool is dynamic - you can add and remove workers at will (though recovering when a worker dies part way through a task requires some care).

The downside is that you currently have to worry about a lot more things at the moment. My aim is make this eventually as smooth as mclapply. The other downside is overhead. For your case, the overhead is a significant fraction of the computing time.

But it's great to have this as an example. I'll probably try to keep using this as an example of something of someone else's workflow I should try to make smooth.

# This is based on your Rmd file
downloader::download("https://github.com/ropensci/rgpdd/raw/master/inst/scripts/knape-de-valpine.R", "knape-de-valpine.R")
rrqueue:::queue_clean(rrqueue:::redis_connection(NULL), "carl")
packages <- c("ggplot2", "dplyr", "tidyr", "knitcitations", "rgpdd",
"FKF", "lazyeval", "reshape2")
sources <- c("functions.R", "knape-de-valpine.R")
obj <- rrqueue::queue("carl", packages, sources)
do_parallel <- function(df, f, ..., rrq) {
grps <- groups(df)
ids <- sapply(grps, function(i) unique(df[[as.character(i)]]))
names(ids) <- as.character(ids)
## turn grouped data.frame to a list of data.frames by MainID
list_data <- lapply(ids,
function(id){
.dots <- list(interp(~y == x, .values = list(y = grps[[1]], x = id)))
filter_(df, .dots = .dots)
})
## this is where the magic is. There's some seriously nasty stuff
## with making sure the same function gets executed on the nodes and
## here, so the env is to help out with that, because I apparently
## don't have the working quite right yet
rrqueue::rrqlapply(list_data, f, rrq, ..., env=.GlobalEnv)
## Note that the above can actually bail early and return a set of
## proxies to the tasks; that's going to confuse the line below :)
}
do_parallel_post_process <- function(list_out) {
## reshape outputs back to a data.frame
melt(list_out, id=names(list_out[[1]])) %>%
rename_(.dots = setNames(list("L1"), as.character(grps[[1]])) ) %>%
as_data_frame()
}
## ------------------------------------------------------------------------
gpdd_main %>%
filter(SamplingProtocol == "Count",
SourceDimension %in% c("Count", "Index"),
SamplingFrequency == "1",
DatasetLength >= 15) %>%
select(MainID) %>%
arrange(MainID) ->
filtered
## ------------------------------------------------------------------------
gpdd_data %>%
filter(MainID %in% filtered$MainID) %>%
select(MainID, Population, SampleYear) %>%
group_by(MainID) %>%
mutate(logN = log(Population)) ->
df
## ------------------------------------------------------------------------
i <- which(df$logN == -Inf)
df$logN[i] <- min(df$logN[-i])-1
## ----eval=FALSE----------------------------------------------------------
## #not run
## some <- sample(unique(df$MainID), 10)
## df %>% filter(MainID %in% some) -> df
## Elsewhere run n copies of:
## rrqueue::worker("carl")
source("functions.R")
df %>% group_by(MainID) %>% do_parallel(sim_di, rrq=obj) -> DI
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment