Skip to content

Instantly share code, notes, and snippets.

@lucacerone
Forked from daroczig/impala.R
Created November 4, 2016 11:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lucacerone/d13ebd3fe30dc27b69b2be1096adab3a to your computer and use it in GitHub Desktop.
Save lucacerone/d13ebd3fe30dc27b69b2be1096adab3a to your computer and use it in GitHub Desktop.
Connect and query Imapala via SSH and Impala-shell from R
#' Connect and query Imapala via SSH and Impala-shell
#' @param query SQL query to run
#' @param host server hostname or IP
#' @param log enable or disable logging of debug/trace messages
#' @return data.table object
#' @export
query_impala <- function(query, host = 'localhost', log = require(futile.logger)) {
## measure time of query
timer <- proc.time()
## make sure the query is a one-liner
query <- gsub('\n', ' ', query)
## try to connect and create a remote temp file
if (log) flog.trace('Connecting to %s and creating tempfile.', host)
rfn <- system(paste0(
'ssh ', host,
' "mktemp --tmpdir=/tmp rimpala-XXXXXXXX.csv"'
), intern = TRUE)
## error handling
if (length(rfn) == 0) {
stop('SSH access error.')
}
if (log) flog.trace('Remote tempfile: %s', rfn)
## build impala-shell query
cmd <- paste(
'impala-shell -B --quiet -q',
shQuote(query),
'-o', rfn,
'"--output_delimiter=,"',
'--print_header > /dev/null')
## and log then run it
if (log) flog.trace(paste0('Querying impala-shell directly via: ', cmd))
system(paste('ssh', host, shQuote(cmd)))
## copy CSV file to localhost
lfn <- tempfile(pattern = 'rimpala-', fileext = '.csv')
if (log) flog.trace('Copying tempfile to temporary location on localhost: %s', lfn)
system(paste0(
'scp -q -C ', host, ':',
rfn, ' ', lfn))
## read data like a pro
if (log) flog.trace('Reading %s bytes of data.', file.info(lfn)$size)
res <- tryCatch(fread(lfn, sep = ','), error = function(e) e)
## if we cannot read data as a pro
if (inherits(res, 'error')) {
res <- data.table(read.csv(lfn))
}
## cleanup
if (log) flog.trace('Deleting temporary files on localhost and remote.')
unlink(lfn)
system(paste('ssh', host, 'rm', rfn))
## log query time
if (log) flog.trace('Query finished with %s rows after %s seconds.',
nrow(res),
round(as.numeric(proc.time() - timer)[3], 2))
## return
res
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment