Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ganeshchand/f6c864fcfaf67fcedf6c848c4c63d69a to your computer and use it in GitHub Desktop.
Save ganeshchand/f6c864fcfaf67fcedf6c848c4c63d69a to your computer and use it in GitHub Desktop.
/**
* Author: github.com/ganeshchand
* Date: 03/04/2021
* Specifying schema when reading different source format is mandatory or optional depending on which DataFrameReader you are using.
* spark.read() is a batch DataFrame reader
* spark.readStream() is a streaming DataFrame reader
* Let's write a quick test to test which reader enforces us to specify schema on read
*/
// step1: Let's generate test dataset for csv, json, parquet, orc and delta
val basePath = "/ganesh/tmp/stream_format_test"
Seq("csv", "json", "parquet", "orc", "delta").foreach { format =>
spark.range(10).write.format(format).save(s"$basePath/output/$format")
}
// step1: Let's read the test dataset using batch and streaming reader API
Seq("""batch: spark.read()""", """streaming: spark.readStream""").foreach {
readType =>
println("\n")
Seq("csv", "json", "parquet", "orc", "delta")
.map { format =>
val df = scala.util.Try {
readType match {
case s: String if s.startsWith("batch") =>
spark.read
.format(format)
.option("inerSchema", true)
.load(s"$basePath/output/$format")
case s: String if s.startsWith("streaming") =>
spark.readStream
.format(format)
.option("inerSchema", true)
.load(s"$basePath/output/$format")
}
}.toOption
(format, df)
}
.foreach { case (f, df) =>
if (df.isDefined)
println(s"$readType for $f doesn't require schema")
else println(s"$readType for $f requires schema")
}
}
/*
batch: spark.read() for csv doesn't require schema
batch: spark.read() for json doesn't require schema
batch: spark.read() for parquet doesn't require schema
batch: spark.read() for orc doesn't require schema
batch: spark.read() for delta doesn't require schema
streaming: spark.readStream for csv requires schema
streaming: spark.readStream for json requires schema
streaming: spark.readStream for parquet requires schema
streaming: spark.readStream for orc requires schema
streaming: spark.readStream for delta doesn't require schema
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment