Skip to content

Instantly share code, notes, and snippets.

@therako
Created May 1, 2018 13:28
Show Gist options
  • Save therako/1e809cd35c563da228a4b585f7a2e90a to your computer and use it in GitHub Desktop.
Save therako/1e809cd35c563da228a4b585f7a2e90a to your computer and use it in GitHub Desktop.
val keyspace = "keyspace"
val tableName = "tableName"
val inputPathStr = "inputPathStr"
val cassHosts = "host1:9092,host2:9092"
val sizeInMB = 61440
val primaryKeys = Array("key1", "key2", "...")
val writePartitions = Math.max(sizeInMB / 256, 1)
val df = session.read.parquet(inputPathStr)
val createTableStatement = getCreateTableStatement(keyspace, tableName, df, primaryKeys)
val insertStatement = getInsertStatement(keyspace, tableName, df)
df.repartition(writePartitions).sortWithinPartitions(df.schema.fields(0).name).rdd.mapPartitions(
it => SSTableWriter(it, cassHosts, createTableStatement, insertStatement)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment