Last active
April 3, 2019 20:58
-
-
Save szilard/dd15162516410db34794 to your computer and use it in GitHub Desktop.
SparkR vs data.table - aggregate 100M records
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
data.table vs SparkR | |
group-by aggregate on 100M records (1M groups) | |
data.table 6.5 sec (without key) / 1.3 sec (with key) - all 1 core | |
SparkR cached 200 sec (8 cores) | |
30x / 150x ( 240x / 1200x per core) | |
Update (1.5.0): SparkR cached 50 sec (8 cores) | |
8x / 40x (60x / 300x per core) | |
################################################## | |
## Setup | |
m3.2xlarge 8 cores - 30G RAM | |
R 3.2.0 | |
data.table 1.9.4 | |
Spark 1.4.0 | |
Update: Spark 1.5.0 | |
################################################## | |
## Generate data | |
time R --vanilla --quiet << EOF | |
set.seed(123) | |
n <- 100e6 | |
m <- 1e6 | |
d <- data.frame(x = sample(m, n, replace=TRUE), y = runif(n)) | |
write.table(d, file = "d.csv", row.names = FALSE, sep = ",") | |
EOF | |
# real 3m50.790s | |
du -h d*csv | |
# 2.4G d.csv | |
################################################## | |
## data.table | |
library(data.table) | |
system.time( | |
d <- fread("d.csv") | |
) | |
# user system elapsed | |
# 25.741 0.416 26.159 | |
system.time( | |
print(head(d[, list(ym=mean(y)), by=x][order(-ym)],5)) | |
) | |
# user system elapsed | |
# 6.280 0.192 6.473 | |
system.time( | |
setkey(d, x) | |
) | |
# user system elapsed | |
# 7.086 0.260 7.346 | |
system.time( | |
print(head(d[, list(ym=mean(y)), by=x][order(-ym)],5)) | |
) | |
# user system elapsed | |
# 1.196 0.080 1.276 | |
################################################## | |
## SparkR | |
./spark-1.4.0-bin-hadoop2.4/bin/sparkR --driver-memory 15G --executor-memory 15G --packages com.databricks:spark-csv_2.10:1.0.3 | |
Update: ./spark-1.5.0-bin-hadoop2.4/bin/sparkR --driver-memory 15G --executor-memory 15G --packages com.databricks:spark-csv_2.11:1.2.0 | |
d <- read.df(sqlContext, "d.csv", "com.databricks.spark.csv", header="true") | |
system.time({ | |
cache(d) | |
count(d) | |
}) | |
# user system elapsed | |
# 0.001 0.000 152.830 | |
system.time({ | |
dd <- agg(group_by(d, d$x), ym = mean(d$y)) | |
print(head( arrange(dd, desc(dd$ym)) )) | |
}) | |
# user system elapsed | |
# 0.015 0.001 203.079 | |
# Update: 0.021 0.000 48.199 | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment