Skip to content

Instantly share code, notes, and snippets.

@xiaodaigh
Created August 28, 2017 22:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xiaodaigh/9b95ee573bc79837bc97e22f20941bda to your computer and use it in GitHub Desktop.
Save xiaodaigh/9b95ee573bc79837bc97e22f20941bda to your computer and use it in GitHub Desktop.
Some R to read .feather data chunkwise
# featherc
library(data.table)
library(feather)
library(future)
library(dplyr)
plan(multiprocess)
options(future.globals.maxSize = Inf)
split_feather <- function(feather_file, by = NULL, parts = parallel::detectCores()) {
system.time(inputdata <- feather::read_feather(feather_file))
setDT(inputdata)
inputdata[, by = by]
}
split_featherc <- function(inputdata, outloc, by = NULL, chunks = max(4,parallel::detectCores(logical = F))) {
if(!dir.exists(".featherc")) {
dir.create(".featherc")
}
if(!dir.exists(file.path(".featherc",outloc))) {
dir.create(file.path(".featherc",outloc))
}
setDT(inputdata)
inputdata[,grp := .GRP %% chunks, keyby = by]
# pt <- proc.time()
tfiles <- inputdata[, {
# pt <- proc.time()
tfile = file.path(".featherc",outloc,.BY)
write_feather(.SD,tfile)
# print(timetaken(pt))
tfile
}, by = grp]
# print(timetaken(pt))
res <- list(locs = tfiles$V1, wd = getwd(), chunks)
class(res) <- "featherc"
res
}
inputdata <- feather::read_feather("a.feather")
by = "ACCOUNT_ID"
parts = parallel::detectCores(logical = F)
outfile = "a"
system.time(fut <- split_featherc(inputdata, outloc = "a", "ACCOUNT_ID"))
a <- 1
b <- function(d) {
browser()
}
b(a + 1)
# library(parallel)
# cl <- parallel::makeCluster(2)
# stopCluster(cl)
filter.featherc <- function(fcdf, criterion, outloc=tempfile(tmpdir=file.path(getwd(),".featherc"))) {
filter_criterion <- deparse(substitute(criterion))
fps <- file.path(getwd(),fcdf$loc)
# fl <- future_lapply(fps, function(dpath) {
fl <- lapply(fps, function(dpath) {
future({
a <- read_feather(dpath)
setDT(a)
a <- eval(parse(text = sprintf("a[%s,]",filter_criterion)))
write_feather(a, dpath)
# print(paste0("done: ",dpath))
})
})
# attr(fcdf,"futures") <- fl
# return(fcdf)
return(fl)
}
system.time(aa <- filter(fut, RISK_GRADE == 10))
lapply(aa,resolved)
lapply(attr(aa,"futures"), resolved)
# test <- function(list_of_vectors, filter_criterion) {
# featherc
library(data.table)
library(future)
list_of_dt <- list(data.table(criteria = 1:5), data.table(criteria = 2:6))
options(future.debug= F)
filter_vec_future <- function(list_of_df, criterion) {
criterion_text <- deparse(substitute(criterion))
future_lapply(list_of_df, function(df) {
setDT(df)
eval(parse(text = sprintf("df[%s]",criterion_text)))
})
}
filter_vec_future(list_of_df, criteria >= 5)
system.time(a <- future({Sys.sleep(5);1}))
system.time(b <- future({Sys.sleep(5);2}))
value(a)
value(b)
system.time(c <- lapply(1:2, function(x) {
future({Sys.sleep(5);x})
}))
lapply(c,value)
filter_vec <- function(list_of_df, criterion) {
criterion_text <- deparse(substitute(criterion))
lapply(list_of_df, function(df) {
setDT(df)
eval(parse(text = sprintf("df[%s,]",criterion_text)))
})
}
filter_vec(list_of_df, criteria >= 5)
filter_vec <- function(list_of_df, criterion) {
criterion_text <- deparse(substitute(criterion))
lapply(list_of_df, function(df) {
setDT(df)
future(eval(parse(text = sprintf("df[%s,]",criterion_text))))
})
}
system.time(afu <- filter_vec(list_of_df, criteria >= 5))
aval <- lapply(afu,value)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment