Skip to content

Instantly share code, notes, and snippets.

args <- commandArgs(trailingOnly = TRUE)
api <- as.integer(args[1]) # 1-rxExecBy, 2-gapply
size <- as.integer(args[2]) # size of data set, million rows
algo <- as.integer(args[3]) # 1-rxSummary, 2-rxLogit, 3-mean
key <- as.integer(args[4]) # 1-DayOfWeek, 2-Dest, 3-Origin+Dest
keys <- list(c("DayOfWeek"), c("Dest"), c("Origin", "Dest"))
apis <- list("rxExecBy", "gapply")
algos <- list("rxSummary", "rxLogit", "mean")
formulas <- list(ArrDelay15 ~ UniqueCarrier + Origin + Dest, ArrDelay15 ~ UniqueCarrier + Origin + DayOfWeek)
cc <- rxSparkConnect(reset = TRUE, nameNode="wasb://benchmark@airlinecranrscaler.blob.core.windows.net/", driverMem="4g", numExecutors=4, executorCores=5, executorMem="16g", executorOverheadMem="4g")
# prepare hive data 1M, 2M, 5M, 10M, 100M
hdfs <- RxHdfsFileSystem(hostName="wasb://benchmark@airlinecranrscaler.blob.core.windows.net/")
airText <- RxTextData("wasb://benchmark@airlinecranrscaler.blob.core.windows.net/airline/csv/airOT1M", firstRowIsColNames=T, fileSystem=hdfs)
airHive <- RxHiveData(table="airOT1M")
rxDataStep(airText, airHive)
airText <- RxTextData("wasb://benchmark@airlinecranrscaler.blob.core.windows.net/airline/csv/airOT2M", firstRowIsColNames=T, fileSystem=hdfs)
cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
colInfo <- list(
DayOfWeek = list(
type = "factor",
levels = c(
"Monday",
"Tuesday",
"Wednesday",
"Thursday",
cc <- rxSparkConnect(reset = TRUE)
colInfo <- list(
ArrDelay = list(
type = "numeric"),
DayOfWeek = list(
type = "factor",
levels = c(
"Monday",
"Sunday",
"Wednesday",
cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
colInfo <- list(
DayOfWeek = list(
type = "factor",
levels = c(
"Monday",
"Tuesday",
"Wednesday",
"Thursday",
cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
xdfData <- RxXdfData(file = "/share/AirlineDemoSmall/AirlineDemoSmallXDF", fileSystem = hdfsFileSystem)
.Message <- function(keys, data)
{
df <- rxImport(data)
rows <- nrow(df)
if (rows > 80000)
{
warning("rows > 80000")
cc <- rxSparkConnect(reset = TRUE)
hdfsFileSystem <- RxHdfsFileSystem()
textData <- RxTextData(file = "/share/AirlineDemoSmall/AirlineDemoSmall.csv", missingValueString = "M", stringsAsFactors = TRUE, fileSystem = hdfsFileSystem)
.Summary <- function(keys, data)
{
df <- rxImport(data)
nrow(df)
}
# single key
@kenthzhang
kenthzhang / rxExecByPrepare.R
Last active April 27, 2017 00:01
RevoScaleR rxExecBy Guide
# Create myDir in hdfs and copy csv file
source <-system.file("SampleData/AirlineDemoSmall.csv", package="RevoScaleR")
myDir <- "/share/AirlineDemoSmall"
rxHadoopMakeDir(myDir)
rxHadoopCopyFromLocal(source, myDir)
cc <- rxSparkConnect(reset = TRUE)
# Summary of data
hdfsFileSystem <- RxHdfsFileSystem()
@kenthzhang
kenthzhang / rxExecByPerfData.csv
Last active April 28, 2017 23:29
Performance, RevoScaleR rxExecBy vs SparkR gapply
Keys API Size UDF Timing
DayOfWeek rxExecBy 1 rxSummary 28.098
DayOfWeek rxExecBy 1 rxLogit 93.798
DayOfWeek rxExecBy 1 mean 26.99
DayOfWeek rxExecBy 2 rxSummary 30.31
DayOfWeek rxExecBy 2 rxLogit 81.456
DayOfWeek rxExecBy 2 mean 34.307
DayOfWeek rxExecBy 5 rxSummary 36.308
DayOfWeek rxExecBy 5 rxLogit 116.653
DayOfWeek rxExecBy 5 mean 62.225