Skip to content

Instantly share code, notes, and snippets.

@chetkhatri
Created July 3, 2020 13:04
Show Gist options
  • Save chetkhatri/1ec0e08e42b1ad4a38430621e0fb5020 to your computer and use it in GitHub Desktop.
Save chetkhatri/1ec0e08e42b1ad4a38430621e0fb5020 to your computer and use it in GitHub Desktop.
object DedupWriter extends Serializable {
def upsertIntoDeduped(microBatchOutput: DataFrame, batchId: Long): Unit = {
DeltaTable.forPath("/mnt/somebucket/ip_index_deduped_updates.delta").as("out")
.merge(
microBatchOutput.as("in"),
// all columns match
)
.whenNotMatched.insertAll.execute
}
}
spark
.readStream
.format("delta")
.option("ignoreChanges", true)
.load("/mnt/aggregates/prod/some_table_that_receives_upserts.delta")
.writeStream
.outputMode("append")
.option("checkpointLocation", CHECKPOINT_PATH)
.foreachBatch(DedupWriter.upsertIntoDeduped _)
.start()
@chetkhatri
Copy link
Author

chetkhatri commented Jul 3, 2020

def upsert(microbatch: DataFrame, batchId: Long) {
    val batchDf = microbatch
    silverTable.as("m")
    .merge(
        batchDf.as("b"),
        "m.record_key_hash = b.record_key_hash"
    ).whenMatched("m.record_hash <> b.record_hash").updateExpr(Map(
        "m.name" -> "b.name"
    ))
    .whenNotMatched().insertAll()
    .execute()
}
spark.readStream.option("ignoreChanges", "true").format("delta").load("<bronze-table>").writeStream.format("delta").foreachBatch(upsert _).outputMode("update").option("checkpointLocation", "out/checkpoint/2").start("<silver-table>")

@dwai1714
Copy link

dwai1714 commented Jul 3, 2020

In Python there is no option of table

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment