Skip to content

Instantly share code, notes, and snippets.

@jpassaro
Created April 14, 2022 20:30
Show Gist options
  • Save jpassaro/886d5febe6e40bced03a3691115c84d5 to your computer and use it in GitHub Desktop.
Save jpassaro/886d5febe6e40bced03a3691115c84d5 to your computer and use it in GitHub Desktop.
spark bug reproduction
ThisBuild / scalaVersion := "2.12.14"
ThisBuild / version := "0.1.0-SNAPSHOT-o"
ThisBuild / organization := "com.example"
ThisBuild / organizationName := "example"
lazy val root = (project in file("."))
.settings(
name := "datos-spark",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.2.0",
"org.apache.spark" %% "spark-sql" % "3.2.0"
),
Compile / run / mainClass := Some("example.Main"),
dependencyOverrides ++= Seq(
"org.scala-lang.modules" %% "scala-xml" % "1.3.0"
),
)
package example
import org.apache.spark.sql.SparkSession
import java.time.Instant
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.{functions => F}
import org.apache.spark.sql.streaming.OutputMode
object Main extends App {
def go(spark: SparkSession) = {
import spark.implicits._
def stream =
spark.readStream
.format("rate")
.option("rowsPerSecond", 1)
.load()
val example1 = ExampleA("id", java.sql.Timestamp.from(Instant.now()))
Thread.sleep(50)
val example2 = ExampleB("id2", 2, java.sql.Timestamp.from(Instant.now()))
val streamingDatasetLeft = stream.limit(1).map(_ => example1)
val streamingDatasetRight = stream.limit(1).map(_ => example2)
val interval = F.expr("interval 200 milliseconds")
val query = streamingDatasetLeft
.withWatermark("whenA", "1 second")
//.join(
.joinWith(
streamingDatasetRight.withWatermark("whenB", "1 second"),
($"idA" === $"idB") && $"whenB".between($"whenA" - interval, $"whenA" + interval),
"leftOuter"
).writeStream.outputMode(OutputMode.Append())
.format("parquet")
.option("path", "/tmp/spark-example/table")
.option("checkpointLocation", "/tmp/spark-example/checkpoint")
.start()
try {
query.awaitTermination(5000)
} finally {
query.stop()
}
}
lazy val spark = SparkSession.builder
.master("local[*]")
.getOrCreate()
try {
go(spark)
} finally {
spark.stop()
}
}
case class ExampleA(idA: String, whenA: java.sql.Timestamp)
case class ExampleB(idB: String, value: Int, whenB: java.sql.Timestamp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment