Skip to content

Instantly share code, notes, and snippets.

@PeterVermont
Last active December 7, 2020 19:18
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PeterVermont/a4a29d2c6b88e4ee012a869dedb5099c to your computer and use it in GitHub Desktop.
Save PeterVermont/a4a29d2c6b88e4ee012a869dedb5099c to your computer and use it in GitHub Desktop.
FutureProcessor.R is a self contained utility for dispatching and tracking future tasks and supports callbacks and extensive (optional) output. app.R is a Shiny app that uses FutureProcessor.R to show how to use it within Shiny.

FutureProcessorWithShinyExample: An asynchronous task processor using the R Future library https://github.com/HenrikBengtsson/future

FutureProcessor.R is a self contained utility for dispatching and tracking future tasks and supports callbacks and extensive (optional) output. app.R is a Shiny app that uses FutureProcessor.R to show how to use it within Shiny.

.Rproj.user
.Rhistory
.RData
.Ruserdata
#http://stackoverflow.com/questions/41610354/calling-a-shiny-javascript-callback-from-within-a-future/42864532#42864532
if (!require(shiny)) install.packages("shiny")
library(shiny)
if (!require(future)) install.packages("future")
library(future)
if (!require(DT)) install.packages("DT")
library(DT)
if (!require(data.table)) install.packages("data.table")
library(data.table)
#from PeterVermont/FutureTaskProcessor.R https://gist.github.com/PeterVermont/a4a29d2c6b88e4ee012a869dedb5099c
#source("FutureTaskProcessor.R")
source("http://gist.githubusercontent.com/PeterVermont/a4a29d2c6b88e4ee012a869dedb5099c/raw/642374e2c12d0d6aa473542225e12e6f5ae97cec/FutureTaskProcessor.R")
plan(multiprocess)
DEBUG_CONSOLE_OUTPUT <- "debugConsoleOutput"
NUM_ASYNC_TASKS_RUNNING <- "NUM_ASYNC_TASKS_RUNNING"
ASYNC_DATA <- "ASYNC_DATA"
FAKE_DATA_INFORMATION <- "FAKE_DATA_INFORMATION"
ui <- fluidPage(
h5(paste0('future::availableCores(): ', future::availableCores())),
h5('NUM_ASYNC_TASKS_RUNNING:'),
textOutput(NUM_ASYNC_TASKS_RUNNING),
h5('ASYNC_DATA:'),
textOutput(ASYNC_DATA),
h5('FAKE_DATA_INFORMATION:'),
textOutput(FAKE_DATA_INFORMATION),
numericInput("duration", "Duration", value = 5, min = 0),
actionButton("start_proc_future", h5("get data using future")),
h5("Debug console (most recent first)"),
DT::dataTableOutput(DEBUG_CONSOLE_OUTPUT)
)
############################ SERVER ############################
server <- function(input, output, session) {
FAKE_PROCESSED_DATA <- "fakeProcessedData"
otherReactiveValues <-
reactiveValues() #WARNING- DON'T USE VARIABLES TO INITIALIZE LIST KEYS - the variable name will be used, not the value
asyncDataValues <- reactiveValues()
asyncDataNumber <- 1
otherReactiveValues[[DEBUG_CONSOLE_OUTPUT]] <-
data.table::data.table(time = Sys.time(), message = "Placeholder to be deleted")[-1, ]
otherReactiveValues[[NUM_ASYNC_TASKS_RUNNING]] <-
getRunningTasksStatus()
debugConsole <- function(msg) {
time <- Sys.time()
newRow <- data.table::data.table(time = time, message = msg)
otherReactiveValues[[DEBUG_CONSOLE_OUTPUT]] <<-
rbind(newRow,
otherReactiveValues[[DEBUG_CONSOLE_OUTPUT]])
print(paste0(nrow(otherReactiveValues[[DEBUG_CONSOLE_OUTPUT]]), ": ", time, ": ", msg))
flush.console()
}
#need to call processRunningTasks so that the callback to the futureFunction will be hit
observe({
invalidateLater(200)
processRunningTasks(debug = TRUE)
})
output[[FAKE_DATA_INFORMATION]] <-
renderText({
paste(
"Next item to be updated: ",
paste0(FAKE_PROCESSED_DATA, "-", asyncDataNumber)
)
})
output[[ASYNC_DATA]] <-
renderText({
myList <- reactiveValuesToList(asyncDataValues)
if (length(myList) == 0) {
result <- ""
} else {
result <-
sapply(1:length(myList), function(i) {
paste0("====> ", myList[[i]]$name, ": ", myList[[i]]$test, " ")
})
}
return(result)
})
output[[NUM_ASYNC_TASKS_RUNNING]] <-
renderText({
otherReactiveValues[[NUM_ASYNC_TASKS_RUNNING]]
})
observeEvent(input$start_proc_future,
{
duration <-
input$duration # This variable needs to be created for use in future object. When using fakeDataProcessing(input$duration) an error occurs: 'Warning: Error in : Operation not allowed without an active reactive context.'
dataName <-
paste0(FAKE_PROCESSED_DATA, "-", asyncDataNumber)
#fakeDataProcessing() is in FutureTaskProcessor.R
startAsyncTask(
dataName,
future(fakeDataProcessing(dataName, duration)),
callback = function(asyncCallbackResults) {
# asyncTaskName = asyncTaskName,
# taskResult = taskResult,
# submitTime = submitTime,
# endTime = endTime,
# elapsedTime = elapsedTime,
# caughtError = caughtError,
# caughtWarning = caughtWarning
asyncTaskName <-
asyncCallbackResults[["asyncTaskName"]]
taskResult <-
asyncCallbackResults[["taskResult"]]
debugConsole(
paste0(
"callback asyncTaskName '",
asyncTaskName,
"' returning with data of size ",
object.size(taskResult)
)
)
asyncDataValues[[asyncTaskName]] <<- taskResult
otherReactiveValues[[NUM_ASYNC_TASKS_RUNNING]] <<-
getRunningTasksStatus()
}
) #end callback and call to startAsyncTask
otherReactiveValues[[NUM_ASYNC_TASKS_RUNNING]] <<-
getRunningTasksStatus()
asyncDataNumber <<- asyncDataNumber + 1
if (asyncDataNumber > 10) {
asyncDataNumber <<- 1
}
},
ignoreNULL = TRUE,
ignoreInit = TRUE)
output[[DEBUG_CONSOLE_OUTPUT]] = DT::renderDataTable({
DT::datatable(otherReactiveValues[[DEBUG_CONSOLE_OUTPUT]])
})
}
# Run the application
shinyApp(ui = ui, server = server)
Version: 1.0
RestoreWorkspace: Default
SaveWorkspace: Default
AlwaysSaveHistory: Default
EnableCodeIndexing: Yes
UseSpacesForTab: Yes
NumSpacesForTab: 2
Encoding: UTF-8
RnwWeave: Sweave
LaTeX: pdfLaTeX
AutoAppendNewline: Yes
StripTrailingWhitespace: Yes
if (!require(future)) install.packages("future")
library(future)
#FutureTaskProcessor.R https://gist.github.com/PeterVermont/a4a29d2c6b88e4ee012a869dedb5099c#file-futuretaskprocessor-r
#NOTE: the file that 'source's this should also call plan(multiprocess, workers=<desired number of workers>) for example:
#plan(multiprocess, workers=min((myNumTasks+1), MAX_PROCESSES))
#it is not required to specify workers -- if not then it will default to future::availableCores()
#use myNumTasks+1 because future uses one process for itself.
asyncTasksRunning <- list()
startAsyncTask <-
function(asyncTaskName,
futureObj,
callback = NULL,
debug = FALSE) {
submitTime = Sys.time()
if (futureObj$lazy) {
warning(
paste0(
"startAsyncTask futureObj has lazy=TRUE! '",
asyncTaskName,
"' will not be started until processRunningTasks is called with wait=TRUE and will then only run one item at a time!"
)
)
}
if (debug)
print(paste0(
submitTime,
": startAsyncTask asyncTaskName '",
asyncTaskName,
"' called. There are now ", length(asyncTasksRunning)+1, " current tasks."
))
if (exists(asyncTaskName, asyncTasksRunning)) {
stop(
"Error: A task with the same asyncTaskName '",
asyncTaskName,
"' is already running. It is not known if it is running the same task"
)
}
asyncTaskObject <- list(
futureObj = futureObj,
taskName = asyncTaskName,
callback = callback,
submitTime = submitTime
)
asyncTasksRunning[[asyncTaskName]] <<- asyncTaskObject
} #end startAsyncTask
getNumberOfRunningTasks <- function() {
return(length(asyncTasksRunning))
}
getRunningTasksStatus <- function() {
getRunningTaskStatus <- function(asyncTaskObject) {
if (is.null(asyncTaskObject) ||
length(names(asyncTaskObject)) < 4) {
runningTaskStatus <- "[NULL]"
} else {
runningTaskStatus <-
paste0(
"[",
asyncTaskObject[["taskName"]],
"'s elapsed time: ",
format(Sys.time() - asyncTaskObject[["submitTime"]]),
", Finished?: ",
resolved(asyncTaskObject[["futureObj"]]),
"]"
)
}
return(runningTaskStatus)
}
runningTasksStatus <-
paste(
Sys.time(),
": # of running tasks: ",
length(asyncTasksRunning),
paste0(collapse = ", ", lapply(
asyncTasksRunning, getRunningTaskStatus
))
)
return(runningTasksStatus)
} #end getRunningTasksStatus
#' Meant to called periodically, this will check all running asyncTasks for completion
#' Returns number of remaining tasks so could be used as a boolean
processRunningTasks <-
function(wait = FALSE,
catchErrors = TRUE,
debug = FALSE,
maximumTasksToResolve = NULL)
{
if (!is.null(maximumTasksToResolve) &&
(maximumTasksToResolve < 1)) {
stop(
paste0(
"processRunningTasks called with maximumTasksToResolve=",
maximumTasksToResolve,
" which does not make sense. It must be greater than 0 if specified"
)
)
}
functionStartTime <- Sys.time()
numTasksResolved <- 0
for (asyncTaskName in names(asyncTasksRunning)) {
if (!is.null(maximumTasksToResolve) &&
(numTasksResolved >= maximumTasksToResolve)) {
# if (debug)
# print(
# paste0(
# Sys.time(),
# ": processRunningTasks: stopping checking for resolved tasks because maximumTasksToResolve (",
# maximumTasksToResolve,
# ") already resolved."
# )
# )
break
} #end checking if need to break because of maximumTasksToResolve
asyncTaskObject <- asyncTasksRunning[[asyncTaskName]]
asyncFutureObject <- asyncTaskObject[["futureObj"]]
isObjectResolved <- resolved(asyncFutureObject)
if (isObjectResolved || wait) {
if (debug && !isObjectResolved) {
print(
paste0(
Sys.time(),
": processRunningTasks about to wait for task '",
asyncTaskName,
"' to finish. ", length(asyncTasksRunning), " tasks still running."
)
)
}
taskResult <- NULL
numTasksResolved <- numTasksResolved + 1
#NOTE future will send any errors it caught when we ask it for the value -- same as if we had evaluated the expression ourselves
caughtError <- NULL
caughtWarning <- NULL
if (catchErrors) {
withCallingHandlers(
expr = {
taskResult <- value(asyncFutureObject)
},
warning = function(w) {
caughtWarning <- w
print(
paste0(
Sys.time(),
": ***WARNING*** processRunningTasks: '",
asyncTaskName,
"' returned a warning: ",
w
)
)
print(sys.calls())
},
error = function(e) {
caughtError <- e
print(
paste0(
Sys.time(),
": ***ERROR*** processRunningTasks: '",
asyncTaskName,
"' returned an error: ",
e
)
)
print(sys.calls())
}
)#end withCallingHandlers
} #end if catch errors
else {
#simply fetch the value -- if exceptions happened they will be thrown by the Future library when we call value and
#therefore will propagate to the caller
taskResult <- value(asyncFutureObject)
}
rm(asyncFutureObject)
submitTime <- asyncTaskObject[["submitTime"]]
endTime <- Sys.time()
elapsedTime <- format(endTime - submitTime)
if (debug)
print(
paste0(
Sys.time(),
": processRunningTasks finished: '",
asyncTaskName,
"' and there are ",
getNumberOfRunningTasks(),
" additional tasks still running.",
# " submitTime: ",
# submitTime,
# ", endTime: ",
# endTime,
" Elapsed time since submitted: ",
elapsedTime
)
)
callback <- asyncTaskObject[["callback"]]
asyncTasksRunning[[asyncTaskName]] <<- NULL
if (!is.null(callback)) {
callback(
list(
asyncTaskName = asyncTaskName,
taskResult = taskResult,
submitTime = submitTime,
endTime = endTime,
elapsedTime = elapsedTime,
caughtError = caughtError,
caughtWarning = caughtWarning
)
)
}
} #end if resolved
}#end loop over async data items being loaded
#Any more asynchronous data items being loaded?
if (debug && (numTasksResolved > 0)) {
print(
paste0(
Sys.time(),
": processRunningTasks with wait=",
wait,
" exiting after resolving: ",
numTasksResolved,
" tasks. Elapsed time in function: ",
format(Sys.time() - functionStartTime),
" tasks still running: ",
length(asyncTasksRunning)
)
)
}
return(length(asyncTasksRunning))
} # end processRunningTasks
fakeDataProcessing <- function(name, duration, sys_sleep = FALSE) {
if (sys_sleep) {
Sys.sleep(duration)
} else {
start_time <- Sys.time()
repeat {
elapsed_time = Sys.time() - start_time
print(paste0(
Sys.time(),
": ",
name,
" elapsed time: ",
format(elapsed_time)
))
if (elapsed_time < duration) {
Sys.sleep(1)
} else {
break
}
} #end repeat
} #end else not using long sleep
return(data.frame(name = name, test = Sys.time()))
} #end fakeDataProcessing
testAsync <- function(loops = future::availableCores() - 1) {
plan(multiprocess)
print(paste0("future::availableCores(): ", future::availableCores()))
loops <- 10 #
baseWait <- 3
for (loopNumber in 1:loops) {
duration <- baseWait + loopNumber
dataName <-
paste0("FAKE_PROCESSED_DATA_testLoop-",
loopNumber,
"_duration-",
duration)
startAsyncTask(
dataName,
futureObj = future(lazy = FALSE, expr = fakeDataProcessing(dataName, duration)),
debug = TRUE
)
#NOTE: if the future is created with lazy=TRUE then the process will not be kicked off until value() is called on it. resolved(futureObj) does not kick it off
processRunningTasks(wait = FALSE, debug = TRUE)
} #end loop
#wait until all tasks are finished
processRunningTasks(wait = TRUE, debug = TRUE)
print(paste0(
"At the end the status should have no running tasks: ",
getRunningTasksStatus()
))
} #end testAsync
#testAsync()
@PeterVermont
Copy link
Author

This includes an example of use called testAsync()

@PeterVermont
Copy link
Author

Added app.R which is a full shiny app that uses FutureProcessor.R to show asynchronous tasks within Shiny.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment