Skip to content

Instantly share code, notes, and snippets.

@bhajer3
Last active October 19, 2015 19:42
Show Gist options
  • Save bhajer3/c3effa92de8e3cfc4fee to your computer and use it in GitHub Desktop.
Save bhajer3/c3effa92de8e3cfc4fee to your computer and use it in GitHub Desktop.
I want to create a data frame from a Cassandra keyspace and column family in sparkR.
I am able to create data frames from tables which do not include any Cassandra collection datatypes,
such as Map, Set and List. But, many of the schemas that I need data from, do include these collection data types.
Here is my local environment.
SparkR Version: 1.5.1
Cassandra Version: 2.1.6
R Version: 3.2.2
Cassandra Connector version: 1.5.0-M2
To test this issue, I did the following iterative process.
sudo ./sparkR --packages com.datastax.spark:spark-cassandra-connector_2.10:1.5.0-M2 --conf spark.cassandra.connection.host=127.0.0.1
Running this command, with sparkR gives me access to the spark cassandra connector package I need,
and connects me to my local cqlsh server ( which is up and running while running this code in sparkR shell ).
CREATE TABLE test_table (
column_1 int,
column_2 text,
column_3 float,
column_4 uuid,
column_5 timestamp,
column_6 boolean,
column_7 timeuuid,
column_8 bigint,
column_9 blob,
column_10 ascii,
column_11 decimal,
column_12 double,
column_13 inet,
column_14 varchar,
column_15 varint,
PRIMARY KEY( ( column_1, column_2 ) )
);
All of the above data types are supported. I insert dummy data after creating this test schema.
For example, now in my sparkR shell, I run the following code.
df.test <- read.df(sqlContext, source = "org.apache.spark.sql.cassandra", keyspace = "datahub", table = "test_table")
assigns with no errors, then,
> schema(df.test)
StructType
|-name = "column_1", type = "IntegerType", nullable = TRUE
|-name = "column_2", type = "StringType", nullable = TRUE
|-name = "column_10", type = "StringType", nullable = TRUE
|-name = "column_11", type = "DecimalType(38,18)", nullable = TRUE
|-name = "column_12", type = "DoubleType", nullable = TRUE
|-name = "column_13", type = "InetAddressType", nullable = TRUE
|-name = "column_14", type = "StringType", nullable = TRUE
|-name = "column_15", type = "DecimalType(38,0)", nullable = TRUE
|-name = "column_3", type = "FloatType", nullable = TRUE
|-name = "column_4", type = "UUIDType", nullable = TRUE
|-name = "column_5", type = "TimestampType", nullable = TRUE
|-name = "column_6", type = "BooleanType", nullable = TRUE
|-name = "column_7", type = "UUIDType", nullable = TRUE
|-name = "column_8", type = "LongType", nullable = TRUE
|-name = "column_9", type = "BinaryType", nullable = TRUE
Schema is correct.
> class(df.test)
[1] "DataFrame"
attr(,"package")
[1] "SparkR"
df.test is clearly defined to be a DataFrame Object.
> head(df.test)
column_1 column_2 column_10 column_11 column_12 column_13 column_14 column_15
1 1 hello NA NA NA NA NA NA
column_3 column_4 column_5 column_6 column_7 column_8 column_9
1 3.4 NA NA NA NA NA NA
sparkR is reading from the column_family correctly, but now lets add a collection data type to the schema.
Now I will drop that test_table, and recreate the table, with with an extra column of data type map<text,int>
CREATE TABLE test_table (
column_1 int,
column_2 text,
column_3 float,
column_4 uuid,
column_5 timestamp,
column_6 boolean,
column_7 timeuuid,
column_8 bigint,
column_9 blob,
column_10 ascii,
column_11 decimal,
column_12 double,
column_13 inet,
column_14 varchar,
column_15 varint,
column_16 map<text,int>,
PRIMARY KEY( ( column_1, column_2 ) )
);
After inserting dummy data into the new test schema,
> df.test <- read.df(sqlContext, source = "org.apache.spark.sql.cassandra", keyspace = "datahub", table = "test_table")
assigns with no errors,
> schema(df.test)
StructType
|-name = "column_1", type = "IntegerType", nullable = TRUE
|-name = "column_2", type = "StringType", nullable = TRUE
|-name = "column_10", type = "StringType", nullable = TRUE
|-name = "column_11", type = "DecimalType(38,18)", nullable = TRUE
|-name = "column_12", type = "DoubleType", nullable = TRUE
|-name = "column_13", type = "InetAddressType", nullable = TRUE
|-name = "column_14", type = "StringType", nullable = TRUE
|-name = "column_15", type = "DecimalType(38,0)", nullable = TRUE
|-name = "column_16", type = "MapType(StringType,IntegerType,true)", nullable = TRUE
|-name = "column_3", type = "FloatType", nullable = TRUE
|-name = "column_4", type = "UUIDType", nullable = TRUE
|-name = "column_5", type = "TimestampType", nullable = TRUE
|-name = "column_6", type = "BooleanType", nullable = TRUE
|-name = "column_7", type = "UUIDType", nullable = TRUE
|-name = "column_8", type = "LongType", nullable = TRUE
|-name = "column_9", type = "BinaryType", nullable = TRUE
correct schema is returned.
> class(df.test)
[1] "DataFrame"
attr(,"package")
[1] "SparkR"
Object is a DataFrame, but now when we see if the dataFrame actually contains records from the column_family.
> head(df.test)
Error in as.data.frame.default(x[[i]], optional = TRUE) :
cannot coerce class ""jobj"" to a data.frame
Note, I will always get the above error, when calling head on a data frame that is read from a cassandra column_family
that has one or more cassandra collection data types (list, set, map).
I know R has list and vector data types which can probably be used to support Cassandra Sets, and Lists, but for Map,
there are packages which enable Hash support, which can be used to replicate a cassandra map data type.
Could we log this as a bug? Cassandra collection data types are widely used in our schemas,
and we want to be able to build data frame from those Cassandra schemas.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment