Skip to content

Instantly share code, notes, and snippets.

@tgirke
Last active December 25, 2018 22:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tgirke/6123d606e2f84989c8e55f8a176e5957 to your computer and use it in GitHub Desktop.
Save tgirke/6123d606e2f84989c8e55f8a176e5957 to your computer and use it in GitHub Desktop.
Streaming through large tabular files in R
################################################
## Streaming through large tabular files in R ##
################################################
## Author: Thomas Girke
## Last update: 21-Dec-2018
## Utility: the function 'processFileInLineChunks' streams through a file in
## batches of lines and applies to each imported line batch a function assigned
## to the 'myFct' argument. The number of lines processed in each iteration can
## be defined under the 'n_rows' argument. As importer functions, both 'fread'
## and 'read_*' from the 'data.table' and 'readr' packages are supported,
## respectively. The results can be returned as list, vector and tabular
## objects. Depending on the chosen import function the latter will be a
## 'tibble' or a 'data.table'. Alternatively, the processing results of each
## imported file batch can be appended to a file after each iteration. This way
## the memory consumption can be minimized since no larger R objects will be
## generated. For testing and debugging one can take advantage of the 'nchunks'
## option that allows to limit the number of chunks imported and processed.
## The function can also be used to simply import the data of tabular files
## without further processing by assigning to 'myFct <- function(x) x'.
## Source function from R
# library(RCurl)
# source(textConnection(getURL("https://gist.githubusercontent.com/tgirke/6123d606e2f84989c8e55f8a176e5957/raw/processFileInChunks.R")))
## Updating this gist from command-line (only relevant for maintainer)
# git clone https://gist.github.com/6123d606e2f84989c8e55f8a176e5957.git
## -> make edits here
# git commit -am "some message"; git push -u origin master
processFileInChunks <- function(filepath, read_fct="read_tsv",
myFct=colSums, writetofile=NULL, n_rows=10,
skip=0, returnformat="list", nchunks=Inf,
verbose=TRUE) {
## Input validity checks
if(any(c(class(filepath) != "character", length(filepath)!=1))) stop("Argument 'filepath' needs to be character vector of length one.")
if(!is.function(myFct)) stop("Argument 'myFct' needs to be a function.")
if(skip!=0) warning("Argument should be 0 in most cases.")
if(!returnformat %in% c("list", "tabular", "vector")) stop("Argument 'returnformat' has to be one of 'list', 'tabular' or 'vector'.")
## Storage container
bucket <- list()
loopcounter <- 2
## Read first row chunk of file via import method defined under 'read_fct'
if(read_fct=="fread") {
require(data.table)
suppressMessages(slice <- fread(filepath, nrows=n_rows, skip=skip, header=TRUE))
} else if(read_fct %in% c("read_tsv", "read_delim", "read_csv", "read_csv2")) {
require(readr); require(dplyr)
suppressMessages(slice <- eval(parse(text=read_fct))(filepath, n_max=n_rows, skip=skip, col_names=TRUE))
} else {
stop("Value assigned to read_fct can only be 'fread', 'read_tsv', 'read_delim', 'read_csv', 'read_csv2'.")
}
## To maintain original column titles, if possible
if(!is.null(dim(slice))) mycolnames <- colnames(slice)
## Apply stats function provided by 'myFct' argument
result <- myFct(as.matrix(slice))
bucket <- c(bucket, list(result))
## Create names for aggregated line-wise results
aggrname <- paste0("rows:", as.character(skip+1), "-", as.character(skip+nrow(slice)))
names(bucket)[length(bucket)] <- aggrname
if(verbose==TRUE) print(paste("Processed lines:", aggrname))
## Process subsequent row chunks in loop
while(n_rows == nrow(slice)) {
skip <- skip + n_rows
if(read_fct=="fread") {
suppressMessages(slice <- fread(filepath, nrows=n_rows, skip=skip+1, header=FALSE))
} else if(read_fct %in% c("read_tsv", "read_delim", "read_csv", "read_csv2")) {
suppressMessages(slice <- eval(parse(text=read_fct))(filepath, n_max=n_rows, skip=skip+1, col_names=FALSE))
} else {
stop("Value assigned to read_fct can only be 'fread' or read_tsv")
}
## To maintain original column titles, if possible
if(!is.null(dim(slice))) colnames(slice) <- mycolnames[1:ncol(slice)]
## Apply stats function provided by 'myFct' argument
result <- myFct(as.matrix(slice))
bucket <- c(bucket, list(result))
## Create names for aggregated line-wise results
aggrname <- paste0("rows:", as.character(skip+1), "-", as.character(skip+nrow(slice)))
names(bucket)[length(bucket)] <- aggrname
## Status statement
if(verbose==TRUE) print(paste("Processed lines:", aggrname))
## Write results to file in each iteration
if(!is.null(writetofile)) {
if(length(bucket)==2) {
## vector data
if(is.null(dim(bucket[[1]]))) {
bucket[[1]] <- bind_cols(RowIndex=rep(names(bucket[1]), length(bucket[[1]])), data.frame(Stats=bucket[[1]]))
write_tsv(as.data.frame(bucket[[1]]), writetofile)
bucket[[2]] <- bind_cols(RowIndex=rep(names(bucket[2]), length(bucket[[2]])), data.frame(Stats=bucket[[2]]))
write_tsv(as.data.frame(bucket[[2]]), writetofile, append=TRUE)
## matrix data
} else {
colnames(bucket[[1]]) <- mycolnames
bucket[[1]] <- bind_cols(RowIndex=rep(names(bucket[1]), nrow(bucket[[1]])), data.frame(bucket[[1]]))
write_tsv(as.data.frame(bucket[[1]]), writetofile)
colnames(bucket[[2]]) <- mycolnames
bucket[[2]] <- bind_cols(RowIndex=rep(names(bucket[2]), nrow(bucket[[2]])), data.frame(bucket[[2]]))
write_tsv(as.data.frame(bucket[[2]]), writetofile, append=TRUE)
}
}
if(length(bucket)==1) {
## vector data
if(is.null(dim(bucket[[1]]))) {
bucket[[1]] <- bind_cols(RowIndex=rep(names(bucket[1]), length(bucket[[1]])), data.frame(Stats=bucket[[1]]))
write_tsv(as.data.frame(bucket[[1]]), writetofile, append=TRUE)
## matrix data
} else {
colnames(bucket[[1]]) <- mycolnames
bucket[[1]] <- bind_cols(RowIndex=rep(names(bucket[1]), nrow(bucket[[1]])), data.frame(bucket[[1]]))
write_tsv(as.data.frame(bucket[[1]]), writetofile, append=TRUE)
}
}
bucket <- list()
}
## Stop streaming process after specific number of iterations
## specified under 'nchunks'. This is useful for testing!
if(loopcounter >= nchunks) { break }
loopcounter <- loopcounter + 1
}
## Return results as R object instead of file
if(is.null(writetofile)) {
## As a list
if(returnformat=="list") {
return(bucket)
} else if(returnformat=="tabular") {
if(is.null(dim(bucket[[1]]))) {
tmp <- bind_cols(RowIndex=rep(names(bucket), sapply(bucket, length)), as_tibble(unlist(bucket)))
} else {
tmp <- do.call("rbind", bucket)
if(read_fct %in% c("read_tsv", "read_delim", "read_csv", "read_csv2")) {
if("tbl" %in% class(tmp)) tmp <- bind_cols(RowIndex=names(bucket), tmp)
if(class(tmp)=="matrix") tmp <- bind_cols(RowIndex=names(bucket), as_tibble(tmp))
}
}
if(read_fct=="fread") {
if(is.matrix(tmp)) tmp <- cbind(RowIndex=names(bucket), as.data.table(tmp))
if(is.tbl(tmp)) tmp <- as.data.table(tmp)
}
return(tmp)
} else if(returnformat=="vector") {
return(unlist(bucket))
} else {
stop("Unsupported value assigned to 'returnformat'.")
}
}
}
## Usage:
## Create test file
library(readr)
df <- data.frame(C1=rnorm(10001), C2=rnorm(10001), C3=rnorm(10001), C4=rnorm(10001))
write_tsv(df, "testfile")
## Some functions to assign to 'myFct'
myFct <- function(x) tail(x, 1)
myFct <- function(x) x # for reading in data without processing
myFct <- rowSums
myFct <- function(x) t(as.matrix(colSums(x)))
myFct <- function(x) sapply(seq_along(x[,1]), function(i) t.test(x[i,1:2], x[i,3:4])$p.value)
myFct <- max
## Run processFileInChunks with list output
(r <- processFileInChunks(filepath="testfile", read_fct="read_tsv",
myFct=myFct, n_rows=1000,
returnformat="list", verbose=TRUE))
do.call("rbind", r)
## Run processFileInChunks with tibble/data.table output
(r <- processFileInChunks(filepath="testfile", read_fct="read_tsv",
myFct=myFct, n_rows=1000,
returnformat="tabular", verbose=TRUE))
## Run processFileInChunks with with vector output
(r <- processFileInChunks(filepath="testfile", read_fct="read_tsv",
myFct=myFct, n_rows=1000,
returnformat="vector", verbose=TRUE))
## Run processFileInChunks with file output
processFileInChunks(filepath="testfile", read_fct="fread", nchunks=Inf,
myFct=myFct, writetofile="zzz", n_rows=1000,
returnformat="tabular", verbose=TRUE)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment