Skip to content

Instantly share code, notes, and snippets.

@mdespriee
Last active January 26, 2018 23:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mdespriee/6a0c7c36c536d2ec43ac1236863e9f04 to your computer and use it in GitHub Desktop.
Save mdespriee/6a0c7c36c536d2ec43ac1236863e9f04 to your computer and use it in GitHub Desktop.
aims to reproduce SPARK-23220 : a broadcast join is transformed to a SortMergeJoin
package ssp
import java.nio.charset.Charset
import java.nio.file.{Files, Paths}
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.storage.StorageLevel
case class HostnameFilter(
hostname: String,
descr: String
) extends Product
object SimpleTest extends App {
val sparkBuilder = SparkSession.builder()
.master("local[2]")
.appName("tests")
val spark = sparkBuilder.getOrCreate()
val rawWebEventJson: String =
"""
|{"app_id":"foo","event":"foo","event_id":"foo","timestamp":"2017-12-10T04:00:20.219Z","hostname":"foo"}
""".stripMargin
val dir = Files.createTempDirectory(Paths.get("/tmp/"), "events")
val f = dir.resolve("data")
val fw = Files.newBufferedWriter(f, Charset.forName("UTF-8"))
fw.write(rawWebEventJson)
fw.close()
import spark.implicits._
val inputSchema = StructType(
StructField("app_id", StringType) ::
StructField("event", StringType) ::
StructField("event_id", StringType) ::
StructField("timestamp", StringType) ::
StructField("hostname", StringType) ::
Nil
)
val ds = spark.readStream.format("json").option("path", dir.toString).schema(inputSchema).load()
val hostnames: Dataset[HostnameFilter] = Seq(HostnameFilter("host", "")).toDS
hostnames.persist(StorageLevel.MEMORY_AND_DISK_SER)
val query = ds
.join(broadcast(hostnames), Seq("hostname"), "leftanti")
.writeStream
.option("checkpointLocation", s"/tmp/checkpoint/event")
.format("parquet")
.queryName("event")
.option("path", s"/tmp/output/event")
.trigger(Trigger.ProcessingTime(20000))
.outputMode(OutputMode.Append())
query.start()
spark.streams.active.foreach(_.awaitTermination())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment