Skip to content

Instantly share code, notes, and snippets.

@retrography
Forked from russellpierce/parallelRDS.R
Last active May 20, 2022 12:45
Show Gist options
  • Save retrography/359e0cc56d2cf1acd161b5645bc801a8 to your computer and use it in GitHub Desktop.
Save retrography/359e0cc56d2cf1acd161b5645bc801a8 to your computer and use it in GitHub Desktop.
Parallelize RDS compression/decompression to improve serialization performance in R
# The functions below use parallelized versions of gzip, xz, and bzip2 to
# improve compression/decompression performance of RDS serialization in R.
# Each function searches for the appropriate program (based on the required
# compression format) and if found, offloads the compression handling to the
# external program and therefore leaves R free to do the data import/export.
# The two main functions (saveRDS and readRDS) mask R's native read and write
# functions. The functions have been only tested on macOS, but they must work
# on any Linux/Unix.
#
# Requires the following packages: pxz, pbzip2, and pigz.
#
# Run the following line at the command prompt before using the functions.
#
# brew install pigz pbzip2 pigz
#
#! Todo: Use saveRDS's original structure from base (create new gzfile, etc. functions instead of creating specialized saveRDS functions)
#! Todo: Rename loadRDS and writeRDS to something appropriate
#! Note: Tested on Ubuntu and it all works, but pxz doesn't compile on macOS for now while pbzip2 hangs up on macOS. No idea about Windows for now.
library(parallel)
cmdAvail <- function(cmd) as.logical(nchar(Sys.which(cmd)))
writeRDS <- function(object, con) {
tryCatch({
base::saveRDS(
object,
file = con
)
}, warning = function(w) {
print(paste("WARNING: ", w))
}, error = function(e) {
print(paste("ERROR: ", e))
}, finally = {
close(con)
})
}
loadRDS <- function(con) {
tryCatch({
base::readRDS(
file = con
)
}, warning = function(w) {
print(paste("WARNING: ", w))
}, error = function(e) {
print(paste("ERROR: ", e))
}, finally = {
close(con)
})
}
saveRDS.xz <-
function(object,
file,
threads = parallel::detectCores(),
compression_level = 6) {
if (cmdAvail("pxz")) {
writeRDS(
object,
pipe(
paste0(
"pxz -c -k -T",
threads,
" -",
compression_level,
" > ",
file
),
"wb"
)
)
} else {
base::saveRDS(
object,
file = file,
compress = "xz"
)
}
}
readRDS.xz <-
function(file,
threads = parallel::detectCores()) {
if (cmdAvail("pxz")) {
object <-
loadRDS(
pipe(
paste0(
"pxz -d -k -c -T",
threads,
" ",
file
)
)
)
} else {
object <-
base::readRDS(
file
)
}
return(object)
}
saveRDS.gz <-
function(object,
file,
threads = parallel::detectCores(),
compression_level = 6) {
if (cmdAvail("pigz")) {
writeRDS(
object,
pipe(
paste0(
"pigz -c -k -p",
threads,
" -",
compression_level,
" > ",
file
),
"wb"
)
)
} else {
base::saveRDS(
object,
file = file,
compress = "gzip"
)
}
}
readRDS.gz <-
function(file,
threads = parallel::detectCores()) {
if (cmdAvail("pigz")) {
object <-
loadRDS(
pipe(
paste0(
"pigz -d -k -c -p",
threads,
" ",
file
)
)
)
} else {
object <-
base::readRDS(
file
)
}
return(object)
}
saveRDS.bz2 <-
function(object,
file,
threads = parallel::detectCores(),
compression_level = 9) {
if (cmdAvail("pbzip2")) {
writeRDS(
object,
pipe(
paste0(
"pbzip2 -c -k -p",
threads,
" -",
compression_level,
" > ",
file
),
"wb"
)
)
} else {
base::saveRDS(
object,
file = file,
compress = "bzip2"
)
}
}
readRDS.bz2 <-
function(file,
threads = parallel::detectCores()) {
if (cmdAvail("pbzip2")) {
object <-
loadRDS(
pipe(
paste0(
"pbzip2 -d -k -c -p",
threads,
" ",
file
)
)
)
} else {
object <-
base::readRDS(
file
)
}
return(object)
}
readRDS <-
function(file,
threads = parallel::detectCores()) {
if (!file.exists(file)) {
stop(
paste0(
file,
" does not exist!"
)
)
}
fileDetails <-
system2(
"file",
args = file,
stdout = TRUE
)
selector <-
sapply(
c("gzip", "XZ", "bzip2"),
function (x) {grepl(x, fileDetails)}
)
format <-
names(selector)[selector]
if (length(format) == 0) format <- "none"
if (format == "gzip") {
object <- readRDS.gz(file, threads = threads)
} else if (format == "XZ") {
object <- readRDS.xz(file, threads = threads)
} else if (format == "bzip2") {
object <- readRDS.bz2(file, threads = threads)
} else {
object <- force(base::readRDS(file))
}
return(object)
}
saveRDS <-
function(object,
file = "",
compress = TRUE) {
if (compress %in% c(TRUE, "gz", "gzip")) {
saveRDS.gz(object, file)
} else if (compress %in% c("bzip", "bzip2", "bz", "bz2")) {
saveRDS.bz2(object, file)
} else if (compress %in% c("xz", "7zip", "7z")) {
saveRDS.xz(object, file)
} else if (compress == FALSE) {
base::saveRDS(object, file)
} else {
stop(paste0(compress, " is not a recognized compression method!"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment