Skip to content

Instantly share code, notes, and snippets.

@dancingfrog
Created December 5, 2022 05:10
Show Gist options
  • Save dancingfrog/b9527bf40dde86774b95eb64ec11ddb3 to your computer and use it in GitHub Desktop.
Save dancingfrog/b9527bf40dde86774b95eb64ec11ddb3 to your computer and use it in GitHub Desktop.
Plumber with proxy to Shiny (toy) app running in background cluster
library("future")
library("httr")
library("magrittr")
library("plumber")
library("shiny")
library("uuid")
library("websocket")
library("R6")
shiny_port <- 5174
# Define UI for app that draws a histogram ----
ui <- shiny::fluidPage(
# App title ----
shiny::titlePanel("Hello Shiny!"),
# Sidebar layout with input and output definitions ----
shiny::sidebarLayout(
# Sidebar panel for inputs ----
shiny::sidebarPanel(
# Input: Slider for the number of bins ----
shiny::sliderInput(inputId = "bins",
label = "Number of bins:",
min = 1,
max = 50,
value = 30)
),
# Main panel for displaying outputs ----
shiny::mainPanel(
# Output: Histogram ----
shiny::plotOutput(outputId = "distPlot")
)
)
)
# Define server logic required to draw a histogram ----
server <- function(input, output) {
# Histogram of the Old Faithful Geyser Data ----
# with requested number of bins
# This expression that generates a histogram is wrapped in a call
# to renderPlot to indicate that:
#
# 1. It is "reactive" and therefore should be automatically
# re-executed when inputs (input$bins) change
# 2. Its output type is a plot
output$distPlot <- shiny::renderPlot({
x <- faithful$waiting
bins <- seq(min(x), max(x), length.out = input$bins + 1)
hist(x, breaks = bins, col = "#007bc2", border = "white",
xlab = "Waiting time to next eruption (in mins)",
main = "Histogram of waiting times")
})
}
future_cluster <- future::makeClusterPSOCK(1) #future::availableCores())
parallelly::autoStopCluster(future_cluster)
for (cl_idx in seq_along(future_cluster)) {
parallel:::sendCall(future_cluster[[cl_idx]], fun = Sys.getpid, args = list())
cl_pid <- parallel:::recvResult(future_cluster[[cl_idx]])
attr(future_cluster[[cl_idx]]$host, "pid") <- cl_pid
}
future::plan(future::cluster, workers = future_cluster)
shiny_app_future <- future::future({
shiny_app <- shiny::runApp(
app = shiny::shinyApp(ui = ui, server = server),
host = "0.0.0.0",
port = shiny_port,
launch.browser = FALSE
)
return(NULL)
})
# From https://gabrielcp.medium.com/going-real-time-in-r-plumber-with-websockets-93547c767412
PlumberWebSocket <- R6::R6Class(
"PlumberWebSocket",
inherit = plumber::Plumber,
public = list(
onWSOpen = function(ws) {
if (is.function(private$ws_open)) {
private$ws_open(ws)
}
invisible(self)
},
websocket = function(open = NULL) {
if (!is.null(open)) stopifnot(is.function(open))
private$ws_open <- open
}
),
private = list(
ws_open = NULL
)
)
plumbr <- PlumberWebSocket$new()
plumbr$onWSOpen()
clients <- list()
addClient <- function(ws_client, message) {
ws_client$request$uuid <- UUIDgenerate()
print(clients)
for(client in clients) {
print(clients)
print("\n")
}
print(ws_client$request$uuid %in% names(clients))
if (!(ws_client$request$uuid %in% names(clients))) {
clients[[ws_client$request$uuid]] <<- ws_client #<<- modifies clients globally
ws_client$onClose(function() {
removeClient(ws_client$request$uuid)
})
ws_client$request$ws_shiny <- websocket::WebSocket$new(paste0("ws://127.0.0.1", ":", shiny_port, "/"))
ws_client$request$ws_shiny$connect()
ws_client$request$ws_shiny$onMessage(function(event) {
print("Relay Shiny server event message to websocket client...")
ws_client$send(event$data)
print(event)
})
}
return(clients)
}
removeClient <- function(uuid) {
clients[[uuid]] <<- NULL
}
handleWebsockets <- function(pr) {
pr$websocket(
function (ws_client) {
addClient(ws_client)
print("New user connected!")
ws_client$onMessage(function(binary, message) {
if ("ws_shiny" %in% names(ws_client$request)) {
print("Relay websocket message to Shiny server...")
Sys.sleep(1)
ws_client$request$ws_shiny$send(as.character(message))
print(message)
}
})
}
)
pr
}
plumbr %>%
handleWebsockets %>%
plumber::pr_filter("myFilter", function(req){
cat(as.character(Sys.time()), "-",
req$REQUEST_METHOD, req$PATH_INFO, "-",
req$HTTP_USER_AGENT, "@", req$REMOTE_ADDR, "\n")
plumber::forward()
}) %>%
plumber::pr_handle(c("GET", "POST"), "/", function (req, res) {
# "<html>
# <h1>Hello, World!</h1>
# </html>"
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}, serializer = plumber::serializer_html()) %>%
plumber::pr_handle(c("GET", "POST"), "/<file>", function (file, req, res) {
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
cat(paste0(file, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}) %>%
plumber::pr_handle(c("GET", "POST"), "/<path1>/<file>.css", function (path1, file, req, res) {
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
cat(paste0(path1, "/", file, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}, serializer = plumber::serializer_text(
serialize_fn = as.character,
type = "text/css"
)) %>%
plumber::pr_handle(c("GET", "POST"), "/<path1>/<file>.js", function (path1, file, req, res) {
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
cat(paste0(path1, "/", file, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}, serializer = plumber::serializer_text(
serialize_fn = as.character,
type = "application/javascript"
)) %>%
plumber::pr_handle(c("GET", "POST"), "/<path1>/<file>", function (path1, file, req, res) {
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
cat(paste0(path1, "/", file, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}) %>%
plumber::pr_handle(c("GET", "POST"), "/<path1>/<path2>/<file>.css", function (path1, path2, file, req, res) {
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
cat(paste0(path1, "/", path2, "/", file, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}, serializer = plumber::serializer_text(
serialize_fn = as.character,
type = "text/css"
)) %>%
plumber::pr_handle(c("GET", "POST"), "/<path1>/<path2>/<file>.js", function (path1, path2, file, req, res) {
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
cat(paste0(path1, "/", path2, "/", file, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}, serializer = plumber::serializer_text(
serialize_fn = as.character,
type = "application/javascript"
)) %>%
plumber::pr_handle(c("GET", "POST"), "/<path1>/<path2>/<file>", function (path1, path2, file, req, res) {
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
cat(paste0(path1, "/", path2, "/", file, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}) %>%
plumber::pr_handle(c("GET", "POST"), "/<path1>/<path2>/<path3>/<file>.css", function (path1, path2, path3, file, req, res) {
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
cat(paste0(path1, "/", path2, "/", path3, "/", file, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}, serializer = plumber::serializer_text(
serialize_fn = as.character,
type = "text/css"
)) %>%
plumber::pr_handle(c("GET", "POST"), "/<path1>/<path2>/<path3>/<file>.js", function (path1, path2, path3, file, req, res) {
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
cat(paste0(path1, "/", path2, "/", path3, "/", file, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}, serializer = plumber::serializer_text(
serialize_fn = as.character,
type = "application/javascript"
)) %>%
plumber::pr_handle(c("GET", "POST"), "/<path1>/<path2>/<path3>/<file>", function (path1, path2, path3, file, req, res) {
cat(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO, "\n"))
cat(paste0(path1, "/", path2, "/", path3, "/", file, "\n"))
httr_target <- httr::GET(paste0("http://", req$REMOTE_ADDR, ":", shiny_port, req$PATH_INFO));
httr_result <- httr::content(httr_target, "text")
# print(httr_result)
httr_result
}) %>%
plumber::pr_hook("exit", function(){
print("Stop the future cluster (bg workers)...")
tryCatch(
{
future_host <- shiny_app_future$workers[[shiny_app_future$node]]$host
print(future_host)
pid <- attr(future_host, "pid")
print(pid)
tools::pskill(pid)
},
finally = function () {
parallel::stopCluster(future_cluster)
}
)
print("Bye bye!")
}) %>%
plumber::pr_run(
host = "0.0.0.0",
port = 3000
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment