Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
args <- commandArgs(trailingOnly = TRUE)
api <- as.integer(args[1]) # 1-rxExecBy, 2-gapply
size <- as.integer(args[2]) # size of data set, million rows
algo <- as.integer(args[3]) # 1-rxSummary, 2-rxLogit, 3-mean
key <- as.integer(args[4]) # 1-DayOfWeek, 2-Dest, 3-Origin+Dest
keys <- list(c("DayOfWeek"), c("Dest"), c("Origin", "Dest"))
apis <- list("rxExecBy", "gapply")
algos <- list("rxSummary", "rxLogit", "mean")
formulas <- list(ArrDelay15 ~ UniqueCarrier + Origin + Dest, ArrDelay15 ~ UniqueCarrier + Origin + DayOfWeek)
.rxExecBy <- function()
{
cc <- rxSparkConnect(reset = TRUE, nameNode = "wasb://benchmark@airlinecranrscaler.blob.core.windows.net/", driverMem = "4g", numExecutors = 4, executorCores = 5, executorMem = "16g", executorOverheadMem = "4g")
colInfo <- list(UniqueCarrier = list(type = "factor"), Origin = list(type = "factor"), Dest = list(type = "factor"), DayOfWeek = list(type = "factor"))
text <- RxHiveData(table = paste0("airOT", size, "M"), colInfo = colInfo)
if (algo == 1)
{
.Summary <- function(data, keys)
{
model <- rxSummary( ~ ., data = data)
}
runtime <- system.time({ result <- rxExecBy(text, keys[[key]], .Summary) })[[3]]
# print(result)
}
else if (algo == 2)
{
.Model <- function(data, keys, formula)
{
model <- rxLogit(
formula = formula,
data = data,
transformFunc = function(data)
{
data$ArrDelay15 = data$ArrDelay > 15
return(data)
},
transformVars = c("ArrDelay"))
}
runtime <- system.time({ result <- rxExecBy(text, keys[[key]], func = .Model, funcParam = list(formula = formulas[[key]])) })[[3]]
}
else if (algo == 3)
{
.Mean <- function(data, keys)
{
df <- rxDataStep(data, maxRowsByCols = NULL)
result <- lapply(df, mean)
}
runtime <- system.time({ result <- rxExecBy(text, keys[[key]], .Mean) })[[3]]
# print(result)
}
rxSparkDisconnect(cc)
runtime
}
.Model1 <- function(key, data)
{
require(RevoScaleR)
data$UniqueCarrier <- factor(data$UniqueCarrier)
data$Origin <- factor(data$Origin)
data$Dest <- factor(data$Dest)
data$DayOfWeek <- factor(data$DayOfWeek)
model <- rxLogit(
formula = ArrDelay15 ~ UniqueCarrier + Origin + Dest,
data = data,
transformFunc = function(data)
{
data$ArrDelay15 = data$ArrDelay > 15
return(data)
},
transformVars = c("ArrDelay"))
data.frame(key = key, model = I(list(model)))
}
.Model2 <- function(key, data)
{
require(RevoScaleR)
data$UniqueCarrier <- factor(data$UniqueCarrier)
data$Origin <- factor(data$Origin)
data$Dest <- factor(data$Dest)
data$DayOfWeek <- factor(data$DayOfWeek)
model <- rxLogit(
formula = ArrDelay15 ~ UniqueCarrier + Origin + DayOfWeek,
data = data,
transformFunc = function(data)
{
data$ArrDelay15 = data$ArrDelay > 15
return(data)
},
transformVars = c("ArrDelay"))
data.frame(key = key, model = I(list(model)))
}
gapply.models <- list(.Model1, .Model2)
.gapply <- function()
{
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkConfig <- list(spark.driver.memory = "4g",
spark.executor.instances = 4L,
spark.executor.cores = 5L,
spark.executor.memory = "16g",
spark.yarn.executor.memoryOverhead = "4g")
sparkR.session(master = "yarn", sparkConfig = sparkConfig)
schema <- structType(structField("R", "binary"))
df <- sql(paste0("FROM airOT", size, "M SELECT *"))
if (algo == 1)
{
.Summary <- function(key, data)
{
require(RevoScaleR)
data$UniqueCarrier <- factor(data$UniqueCarrier)
data$Origin <- factor(data$Origin)
data$Dest <- factor(data$Dest)
data$DayOfWeek <- factor(data$DayOfWeek)
model <- rxSummary( ~ ., data = data)
data.frame(key = key, model = I(list(model)))
}
result.df <- gapply(df, keys[[key]], .Summary, schema)
runtime <- system.time({ result <- collect(result.df) })[[3]]
# result <- lapply(result$R, unserialize)
# print(lapply(result, as.list))
}
else if (algo == 2)
{
result.df <- gapply(df, keys[[key]], gapply.models[[key]], schema)
runtime <- system.time({ result <- collect(result.df) })[[3]]
}
else if (algo == 3)
{
.Mean <- function(key, data)
{
data$UniqueCarrier <- factor(data$UniqueCarrier)
data$Origin <- factor(data$Origin)
data$Dest <- factor(data$Dest)
data$DayOfWeek <- factor(data$DayOfWeek)
result <- lapply(data, mean)
data.frame(key = key, model = I(list(result)))
}
result.df <- gapply(df, keys[[key]], .Mean, schema)
runtime <- system.time({ result <- collect(result.df) })[[3]]
# result <- lapply(result$R, unserialize)
# print(lapply(result, as.list))
}
sparkR.stop()
runtime
}
runtime <- ifelse(api == 1, .rxExecBy(), .gapply())
print(paste("[PERFORMANCE]", paste(keys[[key]], collapse = "+"), apis[[api]], paste0(size, "M"), algos[[algo]], paste0(runtime, "s"), sep = ", "))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment