Skip to content

Instantly share code, notes, and snippets.

@GaalDornick
Created July 7, 2017 21:32
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save GaalDornick/8920577ca92842f44d7bfd3a277c7545 to your computer and use it in GitHub Desktop.
Save GaalDornick/8920577ca92842f44d7bfd3a277c7545 to your computer and use it in GitHub Desktop.
Structured streaming Unions
package spark.poc
import java.io.File
import java.nio.file.{Files, Path, Paths}
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.scalatest.Outcome
import org.scalatest.fixture
import scala.collection.JavaConverters._
class StreamUnionTest extends fixture.FlatSpec {
private val logger = Logger.getLogger(this.getClass)
case class FixtureParam(sparkSession: SparkSession)
override def withFixture(test: OneArgTest): Outcome = {
val sparkSession = getSparkSession(test.name, "non_prod")
try {
withFixture(test.toNoArgTest(FixtureParam(sparkSession)))
} finally {
sparkSession.stop()
}
}
implicit def fixtureParamToSparkSession(fp: FixtureParam): SparkSession = {
fp.sparkSession
}
private def getSparkSession(appname: String, env: String, sparkConf: Option[SparkConf] = None) = {
logger.info("Creating SparkContext for " + appname)
val conf = if (sparkConf.isDefined) sparkConf.get else new SparkConf()
conf
.setAppName(appname)
.set("spark.io.compression.codec", "lz4")
.set("spark.hadoop.fs.s3a.server-side-encryption-algorithm", "AES256")
.set("spark.hadoop.fs.s3a.connection.timeout", "1800000")
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
if (env != "prod") {
conf.set("quantum.jsonSchemaLocation", "file")
conf.set("spark.master", "local[*]")
conf.set("spark.sql.shuffle.partitions", "10")
}
val sc = SparkSession.builder().config(conf).getOrCreate()
sc.sparkContext.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive", "true")
sc
}
private def delete(dir: Path) = {
if(Files.exists(dir)) {
Files.walk(dir).iterator().asScala.toList
.map(p => p.toFile)
.sortWith((o1, o2) => o1.compareTo(o2) > 0)
.foreach(_.delete)
}
}
"A streaming data frame" should "union with a streaming data frame" in { sparkSession =>
val oldEvents = Paths.get("target/oldEvents/").toAbsolutePath
delete(oldEvents)
Files.createDirectories(oldEvents)
val oldEventFile1=oldEvents.resolve("event1.json")
Files.createFile(oldEventFile1)
Files.write(oldEventFile1, List("""{"eventId":1, "eventType":"A", "acctId":"X"}""",
"""{"eventId":2, "eventType":"B", "acctId":"X"}""",
"""{"eventId":3, "eventType":"C", "acctId":"X"}""",
"""{"eventId":4, "eventType":"D", "acctId":"X"}""",
"""{"eventId":5, "eventType":"A", "acctId":"Y"}""",
"""{"eventId":6, "eventType":"B", "acctId":"Y"}""",
"""{"eventId":7, "eventType":"C", "acctId":"Y"}"""
).toIterable.asJava)
val dfAllOldEvents = sparkSession.read.json(oldEvents.toString)
dfAllOldEvents.createOrReplaceTempView("allOldEvents")
val newEvents = Paths.get("target/newEvents/").toAbsolutePath
delete(newEvents)
Files.createDirectories(newEvents)
val dfNewEvents = sparkSession.readStream.schema(dfAllOldEvents.schema).json(newEvents.toString)
dfNewEvents.createOrReplaceTempView("newEvents")
val dfOldEvents = sparkSession.sql("Select allOldEvents.* from allOldEvents join newEvents on allOldEvents.acctId = newEvents.acctId")
dfOldEvents.createOrReplaceTempView("oldEvents")
val dfAllEvents = dfNewEvents.union(dfOldEvents)
//val dfAllEvents = sparkSession.sql("select * from oldEvents union select * from newEvents")// dfNewEvents.union(dfOldEvents)
dfAllEvents.writeStream
.outputMode("append")
.format("console")
.start()
val newEventFile1=newEvents.resolve("eventNew1.json")
//Files.createFile(newEventFile1)
Files.write(newEventFile1, List("""{"eventId":8, "eventType":"E", "acctId":"X"}""").toIterable.asJava)
sparkSession.streams.awaitAnyTermination(10000)
}
"A streaming data frame" should "union with itself" in { sparkSession =>
val oldEvents = Paths.get("target/oldEvents/").toAbsolutePath
delete(oldEvents)
Files.createDirectories(oldEvents)
val oldEventFile1=oldEvents.resolve("event1.json")
Files.createFile(oldEventFile1)
Files.write(oldEventFile1, List("""{"eventId":1, "eventType":"A", "acctId":"X"}""",
"""{"eventId":2, "eventType":"B", "acctId":"X"}""",
"""{"eventId":3, "eventType":"C", "acctId":"X"}""",
"""{"eventId":4, "eventType":"D", "acctId":"X"}""",
"""{"eventId":5, "eventType":"A", "acctId":"Y"}""",
"""{"eventId":6, "eventType":"B", "acctId":"Y"}""",
"""{"eventId":7, "eventType":"C", "acctId":"Y"}"""
).toIterable.asJava)
val dfAllOldEvents = sparkSession.read.json(oldEvents.toString)
val newEvents = Paths.get("target/newEvents/").toAbsolutePath
delete(newEvents)
Files.createDirectories(newEvents)
val dfNewEvents = sparkSession.readStream.schema(dfAllOldEvents.schema).json(newEvents.toString)
val dfSelfJoinEvents = dfNewEvents.union(dfNewEvents)
dfSelfJoinEvents.writeStream
.outputMode("append")
.format("console")
.start()
val newEventFile1=newEvents.resolve("eventNew1.json")
//Files.createFile(newEventFile1)
Files.write(newEventFile1, List("""{"eventId":8, "eventType":"E", "acctId":"X"}""").toIterable.asJava)
sparkSession.streams.awaitAnyTermination(10000)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment