Skip to content

Instantly share code, notes, and snippets.

Created July 18, 2016 18:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clarkfitzg/08ad74e2099f51e5138b3e0ba72bdc07 to your computer and use it in GitHub Desktop.
Save clarkfitzg/08ad74e2099f51e5138b3e0ba72bdc07 to your computer and use it in GitHub Desktop.
Use serialization to store arbitrary R objects as key value pairs in Spark DataFrames
# Mon Jul 18 08:08:09 PDT 2016
# Goal: Store arbitrary objects in DataFrames as bytes to make dapply more
# general
# Inefficient- this uses CLOB rather than BLOB
# Comments throughout this question are helpful
# Note that setting this global option will NOT show up in Spark. So don't use it.
#options(stringsAsFactors = FALSE)
to_byte_string = function(x) {
# Convert arbitrary R object into string of bytes
# Better way would be to use a binary connection
paste(as.character(serialize(x, connection = NULL)), collapse = " ")
from_byte_string = function(x) {
xcharvec = strsplit(x, " ")[[1]]
xhex = as.hexmode(xcharvec)
xraw = as.raw(xhex)
a = 1:10
ab = to_byte_string(a)
# Sanity check
b = letters
bb = to_byte_string(b)
c = as.factor(LETTERS)
cb = to_byte_string(c)
# The data.frame is only being used as a key value store
local_df = data.frame(key = 1:3, value = c(ab, bb, cb)
, stringsAsFactors = FALSE)
sapply(local_df, class)
# A UDF we'd like to apply to each element
take5 = function(x) x[1:5]
# Side note - Spark doesn't seem to grab the default parameter value
# if I write it like this:
# wrapper = function(df, func = take5){
# I see an error message like:
# Error in serialize(x, connection = NULL) : object 'take5' not found
wrapper = function(df){
# Necessary because we can't assume that every row corresponds to a
# partition
func_bytes = function(xbytes){
# Deserialize, apply function, reserialize
# This would be an excessive amount of serialization if doing in
# pipelined manner
x = from_byte_string(xbytes)
# Actual function body is here:
fx = x[1:5]
out = sapply(df$value, func_bytes)
data.frame(key = df$key, value = out, stringsAsFactors = FALSE)
local_df2 = wrapper(local_df)
# Worry about the key later
local_results = lapply(local_df2$value, from_byte_string)
# Now for the Spark stuff
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
spark_df = createDataFrame(sqlContext, local_df)
spark_df2 = dapplyCollect(spark_df, wrapper)
spark_results = lapply(spark_df2$value, from_byte_string)
# This gives what I expected:
# > spark_results = lapply(spark_df2$value, from_byte_string)
# > spark_results
# [[1]]
# [1] 1 2 3 4 5
# [[2]]
# [1] "a" "b" "c" "d" "e"
# [[3]]
# [1] A B C D E
# Levels: A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment