Created
May 1, 2018 13:28
-
-
Save therako/1e809cd35c563da228a4b585f7a2e90a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val keyspace = "keyspace" | |
val tableName = "tableName" | |
val inputPathStr = "inputPathStr" | |
val cassHosts = "host1:9092,host2:9092" | |
val sizeInMB = 61440 | |
val primaryKeys = Array("key1", "key2", "...") | |
val writePartitions = Math.max(sizeInMB / 256, 1) | |
val df = session.read.parquet(inputPathStr) | |
val createTableStatement = getCreateTableStatement(keyspace, tableName, df, primaryKeys) | |
val insertStatement = getInsertStatement(keyspace, tableName, df) | |
df.repartition(writePartitions).sortWithinPartitions(df.schema.fields(0).name).rdd.mapPartitions( | |
it => SSTableWriter(it, cassHosts, createTableStatement, insertStatement) | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment