Skip to content

Instantly share code, notes, and snippets.

@zola-25
Last active May 6, 2020 17:27
Show Gist options
  • Save zola-25/ddfd45719fc69d3d987ab63c49790897 to your computer and use it in GitHub Desktop.
Save zola-25/ddfd45719fc69d3d987ab63c49790897 to your computer and use it in GitHub Desktop.
Asynchronous REST Operations in R Plumber with futures
# plumber_asynchronous.R
require(future)
require(uuid)
plan(multiprocess)
defaultPackages <- c("plyr",
"dplyr",
"dbplyr",
"reshape2",
"neuralnet",
...whatever you need)
defaultGlobals<-c("workingDir")
workingDir<-getwd()
executingFutures <- list()
completedFutures <- list()
#' Being an asynchronous analysis for the provided <analysisId>
#' @serializer unboxedJSON
#' @post /analysis/<analysisId>/run
function(res, analysisId){
analysisId<-as.integer(analysisId)
uniqueId<-UUIDgenerate()
f<-future(
{
setwd(workingDir)
source("./analysis.R")
analysisResult <- runAnalysis(analysisId) # Run anything you like as long as it is in a package or sourced
return(list(
completedLocation=paste0("/resource/", uniqueId, "/result"),
result=analysisResult))
},
globals=c(defaultGlobals,
"analysisId",
"uniqueId"),
packages=c(defaultPackages)
)
executingFutures[[as.character(uniqueId)]] <<- f
return(resourceAcceptedResponse(res, uniqueId))
}
resourceAcceptedResponse <- function(res, uniqueId) {
queueLocation <- paste0("/queuedResource/", uniqueId, "/status")
res$status <- 202
res$setHeader("location", queueLocation)
return(list(message=paste0("This resource is being created. Keep checking back at GET ", queueLocation, ", when completed you will be redirected to the completed resource"),
location=queueLocation))
}
#' @serializer unboxedJSON
#' @get /queuedResource/<uniqueId>/status
function(res, uniqueId){
executingFuture<-executingFutures[[uniqueId]]
if(!is.null(executingFuture)){
if(resolved(executingFuture)) {
#executingFuture is no longer executing and has resolved
# move from executing to resolved list
executingFutures[[as.character(uniqueId)]] <<- NULL
completedFutures[[as.character(uniqueId)]] <<- executingFuture
return(resourceCompletedRedirect(res, executingFuture))
} else {
# still executing
return(resourceAcceptedResponse(res, uniqueId))
}
}
resolvedFuture <- completedFutures[[uniqueId]]
if(is.null(resolvedFuture)) {
return(resourceNotFoundResponse(res, uniqueId))
}
return(resourceCompletedRedirect(res, resolvedFuture))
}
resourceCompletedRedirect <- function (res, f) {
futureValue <- value(f)
res$setHeader("location", futureValue$completedLocation)
res$status <- 303
return(list(message=paste0("Redirecting to completed resource location ", futureValue$completedLocation),
location=futureValue$completedLocation))
}
resourceNotFoundResponse <- function(res, uniqueId) {
res$status <- 404
return(list(message=paste0("Resource with ID ", uniqueId, " not found. Cache may have expired, please try recreating the resource.")))
}
#' @serializer unboxedJSON
#' @get /resource/<uniqueId>/result
function(res, uniqueId){
if(is.null(uniqueId)) {
res$status = 404
return(list(message="{uniqueId} not provided. Format is GET /resource/{uniqueId}/result to retrieve completed resources"))
}
f <- completedFutures[[as.character(uniqueId)]]
if(is.null(f)) {
return(resourceNotFoundResponse(res, uniqueId))
}
return (value(f)$result)
}
# plumber_synchronous.R
source("./analysis.R")
#' Get then analysis result for the provided <analysisId>
#' @serializer unboxedJSON
#' @get /analysis/<analysisId>/result
function(analysisId){
analysisResult <- runAnalysis(analysisId)
return(analysisResult)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment