Last active
July 25, 2021 16:24
-
-
Save saisgit/38756568433b503cb014bf04963c1b55 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Define Read Options with partitionColumn, lowerBound, upperBound and numPartitions | |
val partitionColumn: String = "id" | |
val numPartitions: String = "4" // We can change as per the Data Volume and Source System | |
val readOptions: Map[String, String] = Map( | |
"driver" -> JDBC_DRIVER, | |
"url" -> JDBC_URL, | |
"user" -> JDBC_USERNAME, | |
"password" -> JDBC_PASSWORD, | |
"dbtable" -> tableName, | |
"partitionColumn" -> partitionColumn, | |
"numPartitions" -> numPartitions, | |
"lowerBound" -> lowerBound, | |
"upperBound" -> upperBound | |
) | |
val optimizedDF = ( | |
spark.read | |
.options(readOptions) | |
.format("jdbc") | |
.load() | |
) | |
// Get Num paritions for optimizedDF | |
val partitions = optimizedDF.rdd.getNumPartitions | |
println(s"Number of Spark Partitions for mysql DataFrame: ${partitions}") | |
//Number of Spark Partitions for mysql DataFrame: 4 | |
// Validate Data Skew for optimizedDF | |
val skewDataODF = optimizedDF.groupBy(spark_partition_id.alias("partition_id")).count.orderBy(col("partition_id")) | |
display(skewDataODF) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment