Skip to content

Instantly share code, notes, and snippets.

@msummersgill
Last active April 7, 2021 21:59
Show Gist options
  • Save msummersgill/fb61204b73c2bebcaf5a1fe299172b45 to your computer and use it in GitHub Desktop.
Save msummersgill/fb61204b73c2bebcaf5a1fe299172b45 to your computer and use it in GitHub Desktop.
SparkR Error Collecting Large SparkDataFrame
library(arrow)
## Open source Apache Spark downloaded from this archive:
## https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
library(SparkR, lib.loc = "~/DatabricksTesting/spark-3.0.1-bin-hadoop2.7/R/lib/")
## $java -version
## openjdk version "1.8.0_212"
## OpenJDK Runtime Environment (build 1.8.0_212-8u212-b03-0ubuntu1.16.04.1-b03)
## OpenJDK 64-Bit Server VM (build 25.212-b03, mixed mode)
## Utilizing databricks-connect 7.3.9 as pyspark backend, with Python 3.7.5
## Connecting to a databricks cluster running databricks runtime version 7.3 LTS (includes Apache Spark 3.0.1, Scala 2.12)
## Installation/configuration steps followed: https://docs.databricks.com/dev-tools/databricks-connect.html
Sys.setenv(SPARK_HOME='/home/matthew14786/.local/lib/python3.7/site-packages/pyspark')
sc <- sparkR.session(sparkConfig = list(spark.databricks.service.address = 'https://adb-5093516137124821.1.azuredatabricks.net/',
spark.databricks.service.token = "<Token>",
spark.databricks.service.clusterId ='1211-161851-undid610',
spark.databricks.service.orgId ='5093516137124821',
spark.databricks.service.port ='15001',
spark.sql.execution.arrow.sparkr.enabled = "true",
spark.driver.memory = "16g",
spark.driver.maxResultSize = "16g",
spark.ui.enabled = "false",
spark.sql.inMemoryColumnarStorage.batchSize = as.integer(1e5),
spark.sql.execution.arrow.maxRecordsPerBatch = as.integer(1e5),
spark.submit.deployMode = "client"),
enableHiveSupport = FALSE)
## Print out Spark Session Configuration
SparkR::sparkR.conf()
# spark.app.id "local-1614716631621"
# spark.app.name "SparkR"
# spark.driver.host "hprstudio01.hpinc.com"
# spark.driver.maxResultSize "16g"
# spark.driver.memory "16g"
# spark.driver.port "42842"
# spark.executor.id "driver"
# spark.executorEnv.LD_LIBRARY_PATH "$LD_LIBRARY_PATH:/opt/R/3.6.0/lib/R/lib::/lib:/usr/local/lib:/usr/lib/x86_64-linux-gnu:/usr/lib/jvm/java-8-openjdk-amd64/jre/lib/amd64/server"
# spark.home "/home/matthew14786/.local/lib/python3.7/site-packages/pyspark"
# spark.master "local[*]"
# spark.r.sql.derby.temp.dir "/tmp/RtmpsU70s6"
# spark.sql.execution.arrow.maxRecordsPerBatch "100000"
# spark.sql.execution.arrow.sparkr.enabled "true"
# spark.sql.inMemoryColumnarStorage.batchSize "100000"
# spark.submit.deployMode "client"
# spark.submit.pyFiles ""
# spark.ui.enabled "false"
# spark.ui.showConsoleProgress "true"
## Define dataframe row count
L <- 365*86400
## Create a local dataframe with a single column containing sequential integer values from 1:L
Original <- data.frame(Index = seq_len(L))
## Copy the data frame to Spark cluster
Remote <- SparkR::as.DataFrame(Original)
## Define an arbitrary number of additional columns to add
## In my environment, 7 extra (total of 8) collects successfully
## but 8 extra (for a total of 9) throws the error message included below
ExtraCols <- 8
## Create additional columns by copying
for(i in seq_len(ExtraCols)){
Remote[[paste0("N",i)]] <- Remote[["Index"]]
}
## Print out structure of Remote Dataframe
str(Remote)
# 'SparkDataFrame': 9 variables:
# $ Index: int 1 2 3 4 5 6
# $ N1 : int 1 2 3 4 5 6
# $ N2 : int 1 2 3 4 5 6
# $ N3 : int 1 2 3 4 5 6
# $ N4 : int 1 2 3 4 5 6
# $ N5 : int 1 2 3 4 5 6
# $ N6 : int 1 2 3 4 5 6
# $ N7 : int 1 2 3 4 5 6
# $ N8 : int 1 2 3 4 5 6
## Attempt to collect Spark DataFrame to local R data.frame
Local <- collect(Remote)
# 21/03/02 14:28:31 WARN SparkServiceRPCClient: Large server response (1073741824 bytes compressed)
# 21/03/02 14:28:42 WARN SparkServiceRPCClient: Unretryable exception for b5dc2967-84a0-4a1b-9c19-777cf2ef1b4b: java.lang.OutOfMemoryError: Java heap space
# Error in readBin(con, raw(), as.integer(dataLen), endian = "big") :
# invalid 'n' argument
# Exception in thread "serve-Arrow" java.lang.OutOfMemoryError: Java heap space
# at java.util.Arrays.copyOf(Arrays.java:3332)
# at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
# at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
# at java.lang.StringBuffer.append(StringBuffer.java:270)
# at org.apache.log4j.helpers.PatternParser$LiteralPatternConverter.format(PatternParser.java:419)
# at org.apache.log4j.PatternLayout.format(PatternLayout.java:506)
# at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310)
# at org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
# at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
# at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
# at org.apache.log4j.Category.callAppenders(Category.java:206)
# at org.apache.log4j.Category.forcedLog(Category.java:391)
# at org.apache.log4j.Category.log(Category.java:856)
# at org.slf4j.impl.Log4jLoggerAdapter.warn(Log4jLoggerAdapter.java:401)
# at org.apache.spark.internal.Logging.logWarning(Logging.scala:69)
# at org.apache.spark.internal.Logging.logWarning$(Logging.scala:68)
# at com.databricks.service.SparkServiceRPCClientStub.logWarning(SparkServiceRPCClientStub.scala:61)
# at com.databricks.service.SparkServiceRPCClient.executeRPC0(SparkServiceRPCClient.scala:92)
# at com.databricks.service.SparkServiceRemoteFuncRunner.withRpcRetries(SparkServiceRemoteFuncRunner.scala:234)
# at com.databricks.service.SparkServiceRemoteFuncRunner.executeRPC(SparkServiceRemoteFuncRunner.scala:156)
# at com.databricks.service.SparkServiceRemoteFuncRunner.executeRPCHandleCancels(SparkServiceRemoteFuncRunner.scala:287)
# at com.databricks.service.SparkServiceRemoteFuncRunner.$anonfun$execute0$1(SparkServiceRemoteFuncRunner.scala:118)
# at com.databricks.service.SparkServiceRemoteFuncRunner$$Lambda$894/3541916.apply(Unknown Source)
# at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
# at com.databricks.service.SparkServiceRemoteFuncRunner.withRetry(SparkServiceRemoteFuncRunner.scala:135)
# at com.databricks.service.SparkServiceRemoteFuncRunner.execute0(SparkServiceRemoteFuncRunner.scala:113)
# at com.databricks.service.SparkServiceRemoteFuncRunner.$anonfun$execute$1(SparkServiceRemoteFuncRunner.scala:86)
# at com.databricks.service.SparkServiceRemoteFuncRunner$$Lambda$2510/2059484210.apply(Unknown Source)
# at com.databricks.spark.util.Log4jUsageLogger.recordOperation(UsageLogger.scala:210)
# at com.databricks.spark.util.UsageLogging.recordOperation(UsageLogger.scala:346)
# at com.databricks.spark.util.UsageLogging.recordOperation$(UsageLogger.scala:325)
# at com.databricks.service.SparkServiceRPCClientStub.recordOperation(SparkServiceRPCClientStub.scala:61)
SparkR::sparkR.session.stop()
## R Environment
sessionInfo()
# R version 3.6.0 (2019-04-26)
# Platform: x86_64-pc-linux-gnu (64-bit)
# Running under: Ubuntu 16.04.6 LTS
#
# Matrix products: default
# BLAS: /usr/lib/libblas/libblas.so.3.6.0
# LAPACK: /usr/lib/lapack/liblapack.so.3.6.0
#
# locale:
# [1] LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C LC_TIME=en_US.UTF-8 LC_COLLATE=en_US.UTF-8 LC_MONETARY=en_US.UTF-8
# [6] LC_MESSAGES=en_US.UTF-8 LC_PAPER=en_US.UTF-8 LC_NAME=C LC_ADDRESS=C LC_TELEPHONE=C
# [11] LC_MEASUREMENT=en_US.UTF-8 LC_IDENTIFICATION=C
#
# attached base packages:
# [1] stats graphics grDevices utils datasets methods base
#
# other attached packages:
# [1] SparkR_3.0.1 arrow_3.0.0
#
# loaded via a namespace (and not attached):
# [1] tidyselect_1.1.0 bit_4.0.4 compiler_3.6.0 magrittr_1.5 assertthat_0.2.1 R6_2.4.1 tools_3.6.0 glue_1.4.2
# [9] bit64_4.0.5 vctrs_0.3.1 packrat_0.5.0 rlang_0.4.7 purrr_0.3.4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment