Skip to content

Instantly share code, notes, and snippets.

@pe-suke
Created November 24, 2018 06:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pe-suke/5b8937b558d1243abc9ad310fda26136 to your computer and use it in GitHub Desktop.
Save pe-suke/5b8937b558d1243abc9ad310fda26136 to your computer and use it in GitHub Desktop.
contents for blog in 2018 Nov 24th
package jp.gr.java_conf.pesk.job
import jp.gr.java_conf.pesk.common.datasource.InMemoryDbInitializer
import jp.gr.java_conf.pesk.conf.SparkSessionBuilder
import jp.gr.java_conf.pesk.model.{IncomingData, User}
object Main extends App with SparkSessionBuilder with InMemoryDbInitializer {
import jp.gr.java_conf.pesk.datasource.DatabaseTableSource._
import jp.gr.java_conf.pesk.common.datasource.dao.UserDao._
import jp.gr.java_conf.pesk.bridger.Bridger.implicits._
import jp.gr.java_conf.pesk.sink.UserSink._
import session.implicits._
initializeInMemoryDb()
// Load data from HSQL
val existing = loadUserTable(User.tableName)
// Incoming data
private val incoming = Seq(IncomingData(1, "UPDATED NAME")).toDS()
// Join and create inserting record and updating record
private val persistData = existing bridge incoming
// Insert new record
saveTable(User.tableName, persistData.map(_._1))
// Update record for expiring existing record
update(persistData.map(_._2))
session.stop
// Assert
private val set = retrieveAll()
set.foreach(println(_))
private val expired = set.filter(_.versionNum == 1).head
assert(expired.id == 1)
assert(expired.versionNum == 1)
assert(expired.name == "TEST1")
assert(expired.expired)
private val inserted = set.filter(_.versionNum == 2).head
assert(inserted.id == 1)
assert(inserted.versionNum == 2)
assert(inserted.name == "UPDATED NAME")
assert(!inserted.expired)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment