Created
May 8, 2019 13:01
-
-
Save johndrummond/3c3ef2bb1fd0191adbee5915c1a399b9 to your computer and use it in GitHub Desktop.
scheduled long running tasks in shiny with futures
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
library(shiny) | |
library(DT) | |
library(dplyr) | |
library(future) | |
library(magrittr) | |
plan(multiprocess) | |
dedupe <- function(r) { | |
makeReactiveBinding("val") | |
observe(val <<- r(), priority = 10) | |
reactive(val) | |
} | |
ui <- fluidPage( | |
DTOutput("myTable") | |
) | |
getReactiveData <- function(){ | |
# using closure to hold state. Observe is the internal function that is returned | |
isNewCall <- TRUE # flag for whether waiting for future or not | |
updateCounter <- 0 | |
getDataFuture <- "" # have future in namespace | |
reactive({ | |
if (isNewCall) { | |
getDataFuture <<- future({ | |
dataChanged <- TRUE | |
resultData <- get_data() | |
print(paste("got data ",Sys.getpid())) | |
list( | |
isNewData = dataChanged, | |
newData = resultData | |
) | |
}) | |
isNewCall <<- FALSE | |
#check in a second whether the thread has finished | |
checkAfter <- 1000 | |
reactResult <- list( | |
isUpdateData = FALSE, | |
updateCount = updateCounter, | |
newData = NULL | |
) | |
} else { | |
if (resolved(getDataFuture)) { | |
# resultsList processed in the main thread | |
print(paste("resolved long running task ",Sys.getpid())) | |
isNewCall <<- TRUE | |
updateCounter <<- updateCounter + 1 | |
resultsList <- value(getDataFuture) | |
if (resultsList$isNewData) { | |
reactResult <- list( | |
isUpdateData = TRUE, | |
updateCount = updateCounter, | |
newData = resultsList$newData | |
) | |
} | |
#next refresh the data in 10 seconds | |
checkAfter <- 10000 | |
} else { #have a look again in a couple of seconds | |
isNewCall <<- FALSE | |
print(paste("not yet finished long running task ",Sys.getpid())) | |
checkAfter <- 1000 | |
reactResult <- list( | |
isUpdateData = FALSE, | |
updateCount = updateCounter, | |
newData = NULL | |
) | |
} | |
} | |
# when to check observer next | |
print(checkAfter) | |
invalidateLater(checkAfter,NULL) | |
reactResult | |
}) | |
} | |
server <- function(input, output, session){ | |
futureData <- getReactiveData() | |
dataChanged <- dedupe(reactive(futureData()$updateCount )) | |
tbl_vals <- eventReactive(dataChanged(), { | |
print(paste("dataChanged ", futureData()$updateCount)) | |
newData <- futureData()$newData | |
if (isTruthy(futureData()$updateCount) ) { | |
print("data now refreshed") | |
newData | |
} else { | |
print("setting initial value") | |
tibble(Value = rnorm(5)) | |
} | |
}) | |
output$myTable <- renderDT({ | |
tbl_vals() | |
}) | |
} | |
get_data <- function(){ | |
expensive_operation() | |
tibble(Value = rnorm(20)) | |
} | |
expensive_operation <- function(){ | |
invisible(rnorm(100000000)) | |
} | |
shinyApp(ui, server) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment