Skip to content

Instantly share code, notes, and snippets.

@ar0ch
Forked from jcheng5/create_forked_task.R
Created April 3, 2018 13:10
Show Gist options
  • Save ar0ch/0dcbff7420f0c2ea37e5045f84f0e2b9 to your computer and use it in GitHub Desktop.
Save ar0ch/0dcbff7420f0c2ea37e5045f84f0e2b9 to your computer and use it in GitHub Desktop.
Concurrent, forked, cancellable tasks in Shiny
library(shiny)
# Also uses parallel, shinyjs, tools
# Create a long-running task, executed in a forked process. (Doesn't work on Windows)
#
# The return value is a promise-like object with three
# methods:
# - completed(): FALSE initially, then TRUE if the task succeeds,
# fails, or is cancelled. Reactive, so when the state changes
# any reactive readers will invalidate.
# - result(): Use this to get the return value. While execution is
# in progress, performs a req(FALSE). If task succeeded, returns
# the return value. If failed, throws error. Reactive, so when
# the state changes any reactive readers will invalidate.
# - cancel(): Call this to prematurely terminate the task.
create_forked_task <- function(expr) {
makeReactiveBinding("state")
state <- factor("running",
levels = c("running", "success", "error", "cancel"),
ordered = TRUE
)
result <- NULL
# Launch the task in a forked process. This always returns
# immediately, and we get back a handle we can use to monitor
# or kill the job.
task_handle <- parallel::mcparallel({
force(expr)
})
# Poll every 100 milliseconds until the job completes
o <- observe({
res <- parallel::mccollect(task_handle, wait = FALSE)
if (is.null(res)) {
invalidateLater(100)
} else {
o$destroy()
if (!is.list(res) || length(res) != 1 || !inherits(res[[1]], "try-error")) {
state <<- "success"
result <<- res[[1]]
} else {
state <<- "error"
result <<- attr(res[[1]], "condition", exact = TRUE)
}
}
})
list(
completed = function() {
state != "running"
},
result = function() {
if (state == "running") {
# If running, abort the current context silently.
# We've taken a reactive dependency on "state" so if
# the state changes the context will invalidate.
req(FALSE)
} else if (state == "success") {
return(result)
} else if (state == "error") {
stop(result)
} else if (state == "cancel") {
validate(need(FALSE, "The operation was cancelled"))
}
},
cancel = function() {
if (state == "running") {
state <<- "cancel"
o$destroy()
tools::pskill(task_handle$pid, tools::SIGTERM)
tools::pskill(-task_handle$pid, tools::SIGTERM)
parallel::mccollect(task_handle, wait = FALSE)
}
}
)
}
library(shiny)
source("create_forked_task.R")
ui <- fluidPage(
shinyjs::useShinyjs(), # Initialize shinyjs library
# Buttons to control job
actionButton("start", "Start"),
shinyjs::disabled(actionButton("stop", "Stop")),
# This will display the job output
tableOutput("out")
)
server <- function(input, output, session) {
# Make "task" behave like a reactive value
makeReactiveBinding("task")
task <- NULL
output$out <- renderTable({
# The task starts out NULL but is required. The req() takes
# care of ensuring that we only proceed if it's non-NULL.
req(task)$result()
})
observeEvent(input$start, {
shinyjs::enable("stop")
shinyjs::disable("start")
task <<- create_forked_task({
# Pretend this takes a long time
Sys.sleep(5)
cars[sample(nrow(cars), 10),]
})
# Show progress message during task start
prog <- Progress$new(session)
prog$set(message = "Executing task, please wait...")
o <- observe({
# Only proceed when the task is completed (this could mean success,
# failure, or cancellation)
req(task$completed())
# This observer only runs once
o$destroy()
# Close the progress indicator and update button state
prog$close()
shinyjs::disable("stop")
shinyjs::enable("start")
})
})
observeEvent(input$stop, {
task$cancel()
})
}
shinyApp(ui, server)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment