Skip to content

Instantly share code, notes, and snippets.

@mskyttner
Last active November 16, 2023 00:34
Show Gist options
  • Save mskyttner/46601e60138a886cdfbfc81067c71725 to your computer and use it in GitHub Desktop.
Save mskyttner/46601e60138a886cdfbfc81067c71725 to your computer and use it in GitHub Desktop.
Shell wrapper for duckdb to support reading from stdin
#!/usr/bin/env Rscript
# usage for example:
# cat data/mydatafile.tsv | head -n 1000 | ./duckstream.R --sql "select mycolumn from stdin;"
library(optparse)
library(readr)
suppressPackageStartupMessages(library(duckdb))
option_list <- list(
make_option(c("-i", "--input"),
action = "store", default = "stdin",
help = "Input file (tab separated values/tsv) to read [default 'stdin']"
),
make_option(c("-1", "--no-header"),
action = "store_true", default = FALSE,
help = "Do not parse first row of input file is a header with column names, instead generate col names 'X1, X2, ...'"
),
make_option(c("-d", "--delim"),
action = "store", default = "\t",
help = "Delimiter used [default is '\\t']"
),
make_option(c("-Q", "--sql"),
action = "store", default = "select * from stdin limit 10;",
help = "Query to issue after loading [default is '%default']",
metavar = "select * from stdin limit 10;"
),
make_option(c("-o", "--output"),
action = "store", default = ":memory:",
help = "Output file to use [default is '%default']",
metavar = ":memory:"
),
make_option(c("-D", "--dump"),
action = "store", default = NULL,
help = "Generate dump file in parquet format at specified directory"
),
make_option(c("--lookahead"),
action = "store", default = 500L,
help = "Lookahead; number of lines to read when inferring column types [default %default]",
metavar = "500"
),
make_option(c("--chunksize"),
action = "store", default = 5e4L,
help = "Chunk size in lines when batch loading data [default %default]",
metavar = "50000"
),
make_option(c("-q", "--quiet"),
action = "store_true", default = FALSE,
help = "Do not print default query results to stdout"
),
make_option(c("-0", "--hide-header"),
action = "store_true", default = FALSE,
help = "Show column names as header in output"
)
)
opt <- parse_args(OptionParser(option_list=option_list))
# open input connection
stdin <- file(opt$input)
open(stdin)
on.exit(close(stdin))
# open duckdb
con <- duckdb::dbConnect(duckdb::duckdb(dbdir = opt$output))
on.exit(duckdb::dbDisconnect(con, shutdown = TRUE))
is_eof <- FALSE
is_header <- FALSE
i <- 0
has_noheader <- opt$`no-header` == TRUE
read_header <- function(n_size = opt$lookahead, delim = opt$delim) {
lines <- readLines(stdin, n = n_size)
i <<- length(lines)
is_eof <<- i < n_size
if (!has_noheader) {
header <- readr::read_delim(lines, delim = delim)
} else {
#ncol <- length(unlist(strsplit(delim, lines[1])))
#colz <- sprintf("col%d", 1:ncol)
header <- readr::read_delim(lines, delim = delim, col_names = FALSE)
}
colspec <- readr::spec(header)
list(data = header, colspec = colspec)
}
write_header <- function(header, temp = "header", tablename = "stdin") {
res <- duckdb::duckdb_register(con, temp, header)
sql <- sprintf("create table if not exists %s as select * from %s;", tablename, temp)
invisible(is.na(DBI::dbExecute(con, sql)))
}
read_chunk <- function(n_size = opt$chunksize, colspec) {
lines <- readLines(stdin, n = n_size)
is_eof <<- length(lines) < n_size
i <<- i + length(lines)
readr::read_delim(c(lines, "\n"), delim = opt$delim,
col_names = names(colspec$cols), col_types = colspec$cols)
}
write_chunk <- function(x, temp = "chunk", tablename = "stdin") {
c1 <- duckdb::duckdb_register(con, temp, x)
sql <- sprintf("insert into %s select * from %s;", tablename, temp)
c2 <- DBI::dbExecute(con, sql)
c3 <- duckdb::duckdb_unregister(con, "chunk")
invisible(all(is.na(c(c1, c2, c3))))
}
process_chunk <- function(colspec, n = opt$chunksize) {
chunk <- read_chunk(colspec, n_size = n)
write_chunk(chunk)
}
process_stream <- function() {
# guess/infer column types
header <- read_header()
write_header(header$data)
# move input into duckdb chunkwise
res <- DBI::dbExecute(con, "begin transaction;")
while (!is_eof) process_chunk(colspec = header$colspec)
res <- DBI::dbExecute(con, "commit;")
res <- DBI::dbExecute(con, "checkpoint;")
is.na(res)
}
is_done <- process_stream()
if (opt$quiet == FALSE) {
out <- DBI::dbGetQuery(con, opt$sql)
is_hidden <- opt$`hide-header` == TRUE
if (!is_hidden) tsv <- readr::format_delim(out, delim = opt$delim)
if (is_hidden) tsv <- readr::format_delim(out, delim = opt$delim, col_names = FALSE)
if (nchar(tsv) > 1) tsv <- substr(tsv, 1, nchar(tsv) - 1)
write(tsv, stdout())
}
if (!is.null(opt$dump)) {
#TODO work around "Error: Not implemented Error: Cannot convert VIEW to SQL because it was not created with a SQL statement"
res <- DBI::dbExecute(con, sprintf("create table export as select * from stdin;"))
res <- DBI::dbExecute(con, sprintf("drop table stdin;"))
res <- DBI::dbExecute(con, sprintf("export database '%s' (format parquet);", opt$dump))
if (!is.na(res)) stop("Export failed!")
}
q(status = is_done)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment