Created
November 24, 2018 06:05
-
-
Save pe-suke/5b8937b558d1243abc9ad310fda26136 to your computer and use it in GitHub Desktop.
contents for blog in 2018 Nov 24th
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package jp.gr.java_conf.pesk.job | |
import jp.gr.java_conf.pesk.common.datasource.InMemoryDbInitializer | |
import jp.gr.java_conf.pesk.conf.SparkSessionBuilder | |
import jp.gr.java_conf.pesk.model.{IncomingData, User} | |
object Main extends App with SparkSessionBuilder with InMemoryDbInitializer { | |
import jp.gr.java_conf.pesk.datasource.DatabaseTableSource._ | |
import jp.gr.java_conf.pesk.common.datasource.dao.UserDao._ | |
import jp.gr.java_conf.pesk.bridger.Bridger.implicits._ | |
import jp.gr.java_conf.pesk.sink.UserSink._ | |
import session.implicits._ | |
initializeInMemoryDb() | |
// Load data from HSQL | |
val existing = loadUserTable(User.tableName) | |
// Incoming data | |
private val incoming = Seq(IncomingData(1, "UPDATED NAME")).toDS() | |
// Join and create inserting record and updating record | |
private val persistData = existing bridge incoming | |
// Insert new record | |
saveTable(User.tableName, persistData.map(_._1)) | |
// Update record for expiring existing record | |
update(persistData.map(_._2)) | |
session.stop | |
// Assert | |
private val set = retrieveAll() | |
set.foreach(println(_)) | |
private val expired = set.filter(_.versionNum == 1).head | |
assert(expired.id == 1) | |
assert(expired.versionNum == 1) | |
assert(expired.name == "TEST1") | |
assert(expired.expired) | |
private val inserted = set.filter(_.versionNum == 2).head | |
assert(inserted.id == 1) | |
assert(inserted.versionNum == 2) | |
assert(inserted.name == "UPDATED NAME") | |
assert(!inserted.expired) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment