Skip to content

Instantly share code, notes, and snippets.

Last active February 24, 2023 21:07
Show Gist options
  • Save harshavardhana/84ffa04b41f31c4804b230d80fddd1c7 to your computer and use it in GitHub Desktop.
Save harshavardhana/84ffa04b41f31c4804b230d80fddd1c7 to your computer and use it in GitHub Desktop.

Spark-shell with S3A based checkpointing

Spark context Web UI available at http://nirisvara:4040
Spark context available as 'sc' (master = local[*], app id = local-1677271782301).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :load SparkStreamingFromDirectory-S3A.scala
Loading SparkStreamingFromDirectory-S3A.scala...
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
defined object SparkStreamingFromDirectory

scala> SparkStreamingFromDirectory.main(Array(""))
23/02/25 02:14:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Xaxis: string (nullable = true)
 |-- Yaxis: string (nullable = true)
 |-- Zaxis: string (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: string (nullable = true)

 |-- Zipcode: string (nullable = true)
 |-- count: long (nullable = false)

Batch: 0
|76166  |2    |
|32564  |2    |
|85210  |2    |
|36275  |3    |
|709    |3    |
|35146  |3    |
|708    |2    |
|35585  |3    |
|32046  |2    |
|27203  |4    |
|34445  |2    |
|27007  |4    |
|704    |10   |
|27204  |4    |
|34487  |2    |
|85209  |2    |
|76177  |4    |

Amount of calls

mc support top api myminio/

API                             RX      TX      CALLS   ERRORS 
s3.CopyObject                   48 KiB  47 KiB  208     0     
s3.DeleteMultipleObjects        146 KiB 47 KiB  417     0     
s3.DeleteObject                 32 KiB  0 B     211     0     
s3.GetObject                    168 B   1.3 KiB 1       0     
s3.HeadObject                   441 KiB 0 B     2950    0     
s3.ListObjectsV2                408 KiB 1.4 MiB 2732    0     
s3.PutObject                    128 KiB 0 B     419     0     


Total: 6938 CALLS, 1.2 MiB RX, 1.5 MiB TX - in 72.36s

The amount of files left over in the wake of this behavior on a versioned buckets.

~ mc ls -r --versions myminio/process-runner/ | wc -l

Our of which 614 actual objects

~  mc ls -r --versions myminio/process-runner/ | grep PUT | wc -l

and almost 409 delete markers (soft deletes)

~ mc ls -r --versions myminio/process-runner/ | grep DEL | wc -l

Actual objects on namespace without versioning lookup

~ mc ls -r myminio/process-runner/  | wc -l

After Direct Checkpointing Write Optimization

Spark context Web UI available at http://nirisvara:4040
Spark context available as 'sc' (master = local[*], app id = local-1677271782301).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.3.2
Using Scala version 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.17)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :load SparkStreamingFromDirectory
SparkStreamingFromDirectory-S3A.scala   SparkStreamingFromDirectory.scala

scala> :load SparkStreamingFromDirectory.scala
Loading SparkStreamingFromDirectory.scala...
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
defined object SparkStreamingFromDirectory

scala> SparkStreamingFromDirectory.main(Array(""))
23/02/25 02:20:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: string (nullable = true)
 |-- Long: string (nullable = true)
 |-- Xaxis: string (nullable = true)
 |-- Yaxis: string (nullable = true)
 |-- Zaxis: string (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: string (nullable = true)

 |-- Zipcode: string (nullable = true)
 |-- count: long (nullable = false)

Batch: 0
|76166  |2    |
|32564  |2    |
|85210  |2    |
|36275  |3    |
|709    |3    |
|35146  |3    |
|708    |2    |
|35585  |3    |
|32046  |2    |
|27203  |4    |
|34445  |2    |
|27007  |4    |
|704    |10   |
|27204  |4    |
|34487  |2    |
|85209  |2    |
|76177  |4    |
~ mc support top api myminio/

API                     RX      TX      CALLS   ERRORS 
s3.GetObject            159 B   1.3 KiB 1       0     
s3.HeadObject           1.5 KiB 0 B     10      0     
s3.ListObjectVersions   765 B   2.0 KiB 5       0     
s3.PutObject            88 KiB  0 B     208     0     


Total: 224 CALLS, 90 KiB RX, 3.3 KiB TX - in 17.00s

Actual number of valid objects

~ mc ls -r --versions myminio/process-runner/ | wc -l

Actual objects on namespace without versioning lookup

~ mc ls -r myminio/process-runner/  | wc -l

Optimization can be seen in terms of total time taken for Batch '0'

Without Optimization With Optimization
72secs 17secs

Total number of namespace pollution

Total DEL markers without optimization Total DEL markers with optimization
409 0

Total number of excess objects on namespace

Total excess objects without optimization Total excess objects with optimization
818 (out of which 409 are DEL markers) 0

Total number of API calls

Total number of API calls without optimization Total number of API calls with optimization
6938 224

The amount of excess calls to object ratio

API Calls / Objects without optimization API Calls / objects with optimization
33.8x 1.09x

This tells us that default checkpoint implementation shipped with Spark is very poorly optmized to use object storage and it is recommended that Direct Checkpointing to be used instead as the benefits are visible beyond doubt.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment