Skip to content

Instantly share code, notes, and snippets.

@gaborcsardi
Last active March 18, 2019 15:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gaborcsardi/a3e4eb9fc554353fc43307598f793a0c to your computer and use it in GitHub Desktop.
Save gaborcsardi/a3e4eb9fc554353fc43307598f793a0c to your computer and use it in GitHub Desktop.
---
title: "Overview of Tools for External (R) Processes"
output:
html_document:
df_print: paged
---
## processx - run external processes
```{r}
library(processx)
```
### `processx::run()`
```{r}
px <- processx:::get_tool("px")
px
```
Uses `$PATH`. Standard output and error are collected by default.
```{r}
run("/bin/ls")
run("ls")
```
```{r}
pxhelp <- run(px, "--help")
cat(pxhelp$stderr)
```
No quoting
```{r}
run(px, c("outln", "arg - with spaces", "outln", "'arg with quote'"))
```
Interruption.
```{r, eval = FALSE}
run("sleep", "5")
```
Spinner
```{r, eval = FALSE}
run(px, c("sleep", "5"), spinner = TRUE)
```
Timeout
```{r, error = TRUE}
run(px, c("sleep", "5"), timeout = 1)
```
Error if exit value is non-zero:
```{r, error = TRUE}
run(px, c("return", "10"))
```
Echo standard output and error
```{r}
outp <- run("ls", "..", echo = TRUE)
```
Setting environment variables, no quoting is needed.
```{r}
run(px, c("getenv", "FOO"), env = c(Sys.getenv(), FOO = "bar"))
```
Redirect standard error to standard output
```{r}
run(px, c("out", "one\n", "err", "two\n", "out", "three\n\n"),
stderr_to_stdout = TRUE, echo = TRUE)
```
Standard output and error callbacks
```{r}
cb <- function(line, proc) {
cat("Got:", line, "\n")
if (line == "done") proc$kill(close_connections = FALSE)
}
result <- run(px,
c("outln", "this", "outln", "that", "outln", "done",
"outln", "still here", "sleep", "10", "outln", "dead by now"),
stdout_line_callback = cb,
error_on_status = FALSE)
```
### `processx::process`
Start a process in the background, manipulate it later.
```{r}
names(processx::process$public_methods)
```
```{r}
proc <- process$new(px, c("sleep", "1000"))
proc
```
```{r}
proc$is_alive()
```
```{r}
proc$get_name()
```
```{r}
proc$get_exe()
```
```{r}
proc$get_cmdline()
```
```{r}
proc$get_status()
```
```{r}
proc$get_username()
```
```{r}
proc$get_wd()
```
```{r}
proc$get_cpu_times()
```
```{r}
proc$get_memory_info()
```
Manipulate a process
```{r}
proc$suspend()
proc$get_status()
proc$resume()
proc$get_status()
```
```{r}
proc$interrupt()
proc
```
```{r}
proc <- process$new(px, c("sleep", "1000"))
proc
```
```{r}
proc$kill()
```
Automatic cleanup
```{r}
proc <- process$new(px, c("sleep", "1000"))
proc
```
```{r}
library(dplyr)
ps::ps() %>%
filter(name == "px")
```
```{r}
rm(proc)
gc()
```
```{r}
ps::ps() %>%
filter(name == "px")
```
Standard output and error
```{r}
proc <- process$new("curl", "https://httpbin.org/delay/1", stdout = "|")
proc$read_output()
proc$poll_io(3000)
proc$read_output_lines()
```
```{r}
proc$poll_io(-1)
```
```{r}
proc$is_alive()
proc$get_exit_status()
```
```{r}
p1 <- process$new("curl", "https://httpbin.org/delay/2", stdout = "|")
p2 <- process$new("curl", "https://httpbin.org/delay/1", stdout = "|")
poll(list(p1, p2), -1)
p2$read_output_lines()
poll(list(p1, p2), -1)
```
```{r}
poll(list(p1, p2), -1)
p1$read_all_output_lines()
```
### Back pressure
If the parent process does not read out the standard output/error pipes,
the pipe buffer will fill up, and the background process stops. _This is
a good thing._ It makes sure that the background process only produces
data at a rate that the parent process can handle. Once the parent reads
out the pipe buffer, the background process resumes.
## callr - run external R processes
### `callr::r()`
```{r}
library(callr)
Sys.getpid()
r(function() Sys.getpid())
```
Lexical scope is lost:
```{r, error = TRUE}
a <- 100
r(function() a + 100)
```
Always create an anonymous function, in case the function you want to call has a non-trivial scope.
```{r, error = TRUE}
r(.libPaths)
```
```{r}
r(function() .libPaths())
```
Errors are copied over from the subprocess:
```{r, error = TRUE}
r(function() 1 + "A")
```
They can include the stack.
```{r}
tryCatch(
r(function() { f <- function() g(); g <- function() 1 + "A"; f() },
error = "stack"),
error = function(e) print(e$stack)
)
```
### `callr::rcmd()`
For running `R CMD <command>` with standard output and error streaming.
### `callr::rscript()
For running `Rscript with standard output and error streaming.
### `callr::r_bg()`
Like `callr::r()` but run the R subprocess in the background. Returns
`callr::r_process` which inherits from `processx::process`.
```{r}
rp <- r_bg(function() { Sys.sleep(1); 1 + 1 } )
rp$wait()
rp$get_result()
```
### `callr::r_process`
You can also use the `r_process` R6 class directly, e.g. to inherit
from it.
### `callr::r_session`
This is a persistent R process, running in the background. You can send
computation to it. This is what pak uses to perform all of its operations.
```{r}
rs <- r_session$new()
rs
```
```{r}
class(rs)
```
Operations
```{r}
names(r_session$public_methods)
```
`run()` is synchronous.
```{r}
rs$run(function() Sys.getpid())
```
`call()` is asynchronous.
```{r}
rs$call(function() Sys.getpid())
rs
```
```{r}
rs$poll_io(-1)
```
```{r}
rs$read()
```
```{r}
rs$call(function() {
print(1:200)
message("done")
"exited"
})
rs$poll_io(-1)
result <- rs$read()
```
```{r}
result
```
Attaching to the terminal of the backgound process. This will be
used for "live" debugging in the future.
```{r}
x <- rs$run(function() library(purrr))
x <- rs$run(function() .GlobalEnv$x <- 100)
```
```{r, eval = FALSE}
rs$attach()
# Interrupt to detach
```
```{r}
rs$close()
rs
```
## ps - query and manipulate processes
Query and manipulate system processes
```{r}
library(ps)
ls("package:ps")
```
```{r}
ps_is_supported()
```
```{r}
me <- ps_handle()
me
```
```{r}
ps_exe(me)
```
List all processes
```{r}
ps::ps()
```
Query process information: pid, exe, cmdline, parent process, child
proceses, creation time, etc.
Open files:
```{r}
ps_open_files(me)
```
Network connections
```{r}
ps_connections(me)
```
Environment variables:
```{r}
ps_environ(me)[c("TERM", "USER", "SHELL", "R_HOME")]
```
Process manipulation: suspend, resume, send signal (UNIX), interrupt, kill.
## Process (tree) Cleanup
Mark a process tree by setting a unique environment variable. When
starting a process:
1. Set unique (random) env var in `processx::process`.
2. This variable is inherited in all of its child, grandchild, etc.
processes.
To clean up a process tree:
1. Enumerate all system processes.
2. Check their environment for the unique env var.
3. If it is set, then eliminate them.
```{r, eval = FALSE}
processx::run(..., cleanup_tree = TRUE)
processx::process$new(..., cleanup_tree = TRUE)
callr::r(..., cleanup_tree = TRUE)
etc.
```
`cleanup_tree` defaults to `FALSE` currently, to be changed later.
## Cleanup reporter for testthat
For every `test_that()` block check that newly started processes were
cleaned up, and all open files and connections were closed.
In `testthat.r`:
```{r, eval = FALSE}
library(testthat)
library(mypackage)
if (ps::ps_is_supported()) {
reporter <- ps::CleanupReporter(testthat::ProgressReporter)$new()
} else {
## ps does not support this platform
reporter <- "progress"
}
test_check("mypackage", reporter = reporter)
```
By default it generates a test failure, and closes the resource. It can
also just close silently, if that's preferred.
## Messages, Progress Bars, etc.
We want to be able to send back messages from the subprocess, to create
status bars, progress bars, etc. Right now this is implemented for
`callr::r_process`. Message conditions of class `callr_message` are
caught in the subprocess and transmitted through another connection to the
parent process.
```{r, error = TRUE}
do <- function() {
msg <- structure(list(message = "hi"),
class = c("callr_message", "condition"))
signalCondition(msg)
signalCondition(msg)
stop("nah-ah")
}
rs <- r_session$new()
withCallingHandlers(
rs$run(do),
callr_message = function(m) print(m))
```
The clipapp package uses this mechanism, to seamlessly transmit cli
messages to the parent.
```{r}
rs <- r_session$new()
cliapp::start_app(theme = cliapp::simple_theme())
cliapp::cli_div(theme = list(par = list("margin-bottom" = 1)))
res <- rs$run(function() {
cliapp::cli_h1("Title")
cliapp::cli_h2("Subtitle")
cliapp::cli_par()
cliapp::cli_text(cliapp:::lorem_ipsum())
cliapp::cli_end()
cliapp::cli_alert_danger("Watch out!")
cliapp::cli_alert_success("Done.")
invisible()
})
```
Note that the theming happens in the parent process!
Global message handlers:
```{r}
do <- function() {
cliapp::cli_div()
cliapp::cli_h1("title")
cliapp::cli_text("text")
}
rs <- callr::r_session$new()
msgs <- list()
withr::with_options(list(
cli.default_handler = function(msg) {
msgs <<- c(msgs, list(msg))
if (!is.null(findRestart("muffleMessage"))) {
invokeRestart("muffleMessage")
}
}),
res <- rs$run(do)
)
msgs
```
## async - integrated framework for async computation and IO
```{r}
library(async)
afun <- async(function() {
when_all(
delay = delay(1/1000)$
then(function() 1),
http = http_get("https://eu.httpbin.org/status/418")$
then(function(x) x$status_code),
process = run_process("pwd")$
then(function(x) trimws(x$stdout)),
r_process = run_r_process(function() 2)$
then(function(x) x$result),
call = call_function(function() 3)$
then(function(x) x$result)
)
})
synchronise(afun())
```
## Summary
External subprocesses: processx.
External R subprocesses: callr.
System processes (i.e. not only subprocesses): ps.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment