Skip to content

Instantly share code, notes, and snippets.

@dwerbam
Last active October 24, 2021 08:52
Show Gist options
  • Save dwerbam/0ae247673f1e0374bae19a3cc62d7521 to your computer and use it in GitHub Desktop.
Save dwerbam/0ae247673f1e0374bae19a3cc62d7521 to your computer and use it in GitHub Desktop.
run pyspark copy/move from R
library(processx)
library(stringr)
runPysparkCopy <- function(fsFrom, fsTo, move=FALSE) {
pyscript_file <- tempfile(tmpdir="/tmp")
fileh<-file(pyscript_file)
copy_or_move <- if(move) "mv" else "cp"
writeLines(stringr::str_interp("
from pyspark.dbutils import DBUtils
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
sc = SparkContext('local')
spark = SparkSession(sc)
print(type(spark))
dbutils = DBUtils(spark)
dbutils.fs.${copy_or_move}('${fsFrom}', '${fsTo}', True)
sc.stop()"), fileh)
close(fileh)
log::Info("*********************************** Copying with R. From:", fsFrom, " to: ", fsTo)
p <- processx::process$new("bash", c("-c", "pyspark"), stdin=pyscript_file,stdout = "|")
p$read_all_output_lines() #this waits too, p$wait() is also possible
p$get_exit_status()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment