Skip to content

Instantly share code, notes, and snippets.

@PeterCorless
Last active October 8, 2018 16:26
Show Gist options
  • Save PeterCorless/8ad5f2f9c37ce5a8daad90a98e85896b to your computer and use it in GitHub Desktop.
Save PeterCorless/8ad5f2f9c37ce5a8daad90a98e85896b to your computer and use it in GitHub Desktop.
Hooking up Spark and ScyllaDB: Part 3
cqlsh> SELECT * FROM test.test;
id | elems | flag | name | toggles
----+----------------+-------+---------+--------------------
2 | ['Oracle'] | True | Tzach | {'cookies': True}
3 | ['IBM'] | False | Nadav | {'cookies': False}
4 | ['Cloudius'] | False | Glauber | {'cookies': False}
1 | ['Red', 'Hat'] | True | Dor | {'cookies': True}
(4 rows)
CREATE KEYSPACE test
WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
CREATE TABLE test.test
(id bigint,
name text,
flag boolean,
elems list<text>,
toggles map<text, boolean>,
PRIMARY KEY (id));
CREATE TABLE test.test_mismatched
(id bigint,
name bigint,
flag text,
elems list<text>,
toggles map<text, boolean>,
PRIMARY KEY (id));
case class Record(id: Long, name: String, flag: Boolean,
elems: List[String], toggles: Map[String, Boolean])
val df = spark.createDataFrame(
Seq(
Record(1L, "Dor", true, List("Red", "Hat"), Map("cookies" -> true)),
Record(2L, "Tzach", true, List("Oracle"), Map("cookies" -> true)),
Record(3L, "Nadav", false, List("IBM"), Map("cookies" -> false)),
Record(4L, "Glauber", false, List("Cloudius"), Map("cookies" -> false))
)
)
// df: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 3 more fields]
val writer = df.write.cassandraFormat(table = "test", keyspace = "test")
// writer: org.apache.spark.sql.DataFrameWriter[org.apache.spark.sql.Row] = org.apache.spark.sql.DataFrameWriter@6cf47d05
docker-compose exec scylla cqlsh
cd scylla-and-spark/writing-to-scylla
docker-compose up -d
df.createCassandraTable(
keyspaceName = "test",
tableName = "another_test",
partitionKeyColumns = Some(List("id"))
)
df.write.cassandraFormat(keyspace = "test", table = "another_test").save()
import scala.util.Random
def randomRecord = Record(Random.nextLong, Random.nextString(5), false, List(), Map())
val largeDf = spark.createDataFrame(List.fill(25000)(randomRecord))
largeDf.write.cassandraFormat(keyspace = "test", table = "test").save()
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
import com.datastax.spark.connector.cql.{CassandraConnector, Schema}
val connector = CassandraConnector(spark.sparkContext)
val schema = Schema.fromCassandra(
connector,
keyspaceName = Some("test"),
tableName = Some("test_mismatched"))
val tableDef = schema.tables.headOption
tableDef.map(_.cql).getOrElse("")
// res22: String =
// CREATE TABLE "test"."test_mismatched" (
// "id" bigint,
// "elems" list<varchar>,
// "flag" boolean,
// "name" bigint,
// "toggles" map<varchar, boolean>,
// PRIMARY KEY (("id"))
// )
docker-compose exec spark-master spark-shell \
--conf spark.driver.host=spark-master \
--conf spark.cassandra.connection.host=scylla \
--packages datastax:spark-cassandra-connector:2.3.0-s_2.11,commons-configuration:commons-configuration:1.10
java.util.NoSuchElementException: Columns not found in table test.test_mismatched: name
import com.datastax.spark.connector.cql.TableDef
import com.datastax.driver.core.ProtocolVersion
val dfTableDef = TableDef.fromDataFrame(
df,
keyspaceName = "test",
tableName = "test",
ProtocolVersion.V4
)
dfTableDef.cql
// res18: String =
// CREATE TABLE "test"."test" (
// "id" bigint,
// "name" varchar,
// "flag" boolean,
// "elems" list<varchar>,
// "toggles" map<varchar, boolean>,
// PRIMARY KEY (("id"))
// )
docker-compose exec scylla cqlsh -e "SELECT * FROM system_traces.sessions"
session_id | request | duration | parameters
-------------------------------------+----------------------------------------------------------------+----------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
21fd12c2-b78f-11e8-833c-000000000000 | Execute CQL3 prepared query [1ac00b8e2ea23672b0f686b2fd50ad38] | 374 | {'consistency_level': 'LOCAL_QUORUM', 'page_size': '5000', 'query': 'INSERT INTO "test"."test" ("id", "name", "flag", "elems", "toggles") VALUES (:"id", :"name", :"flag", :"elems", :"toggles")', 'serial_consistency_level': 'SERIAL'}
2123a2b0-b78f-11e8-833c-000000000000 | Execute CQL3 prepared query [1ac00b8e2ea23672b0f686b2fd50ad38] | 5061 | {'consistency_level': 'LOCAL_QUORUM', 'page_size': '5000', 'query': 'INSERT INTO "test"."test" ("id", "name", "flag", "elems", "toggles") VALUES (:"id", :"name", :"flag", :"elems", :"toggles")', 'serial_consistency_level': 'SERIAL'}
1b5ebcc2-b78f-11e8-833c-000000000000 | Execute CQL3 prepared query [1ac00b8e2ea23672b0f686b2fd50ad38] | 310 | {'consistency_level': 'LOCAL_QUORUM', 'page_size': '5000', 'query': 'INSERT INTO "test"."test" ("id", "name", "flag", "elems", "toggles") VALUES (:"id", :"name", :"flag", :"elems", :"toggles")', 'serial_consistency_level': 'SERIAL'}
21719ba1-b78f-11e8-833c-000000000000 | Execute CQL3 prepared query [1ac00b8e2ea23672b0f686b2fd50ad38] | 266 | {'consistency_level': 'LOCAL_QUORUM', 'page_size': '5000', 'query': 'INSERT INTO "test"."test" ("id", "name", "flag", "elems", "toggles") VALUES (:"id", :"name", :"flag", :"elems", :"toggles")', 'serial_consistency_level': 'SERIAL'}
20b2df81-b78f-11e8-833c-000000000000 | Execute CQL3 prepared query [1ac00b8e2ea23672b0f686b2fd50ad38] | 1059 | {'consistency_level': 'LOCAL_QUORUM', 'page_size': '5000', 'query': 'INSERT INTO "test"."test" ("id", "name", "flag", "elems", "toggles") VALUES (:"id", :"name", :"flag", :"elems", :"toggles")', 'serial_consistency_level': 'SERIAL'}
docker-compose exec scylla cqlsh -e "TRUNCATE TABLE test.test;"
docker-compose exec scylla nodetool settraceprobability 1
df.write.cassandraFormat(keyspace = "test", table = "test_mismatched").save()
// ... java.lang.NumberFormatException: For input string: "a" ...
// Read the "test" table into a DataFrame in Spark
val dfFromScylla = spark.read.cassandraFormat("test", "test").load()
// Create a new table in Scylla based on the schema for the DataFrame we just loaded
dfFromScylla.createCassandraTable(
keyspaceName = "test",
tableName = "test_duplicated",
partitionKeyColumns = Some(List("id"))
)
// Write the DataFrame into the new table
dfFromScylla.write.cassandraFormat(keyspace = "test", table = "test_duplicated").save()
writer.save()
// java.io.IOException: Couldn't find test.test or any similarly named keyspace and table pairs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment