Skip to content

Instantly share code, notes, and snippets.

@eduardorost
Last active September 1, 2019 01:38
Show Gist options
  • Save eduardorost/4ce7253949f6131ab6b01ab56ba1e3a4 to your computer and use it in GitHub Desktop.
Save eduardorost/4ce7253949f6131ab6b01ab56ba1e3a4 to your computer and use it in GitHub Desktop.
def process(changeDocument: ChangeStreamDocument[Document]) = {
implicit val formats: DefaultFormats.type = DefaultFormats
val rdd = spark.sparkContext.makeRDD(List(write(mapRecord(changeDocument))))
if (rddQueue.isEmpty)
rddQueue.enqueue(rdd)
else
rddQueue.enqueue(rddQueue.dequeue().union(rdd))
}
case class MongoChangeRecord(token: String, document: String, operationType: String, removedFields: Seq[String], updatedFields: String, timestamp: Long)
def mapRecord(doc: ChangeStreamDocument[Document]): MongoChangeRecord = {
MongoChangeRecord(
doc.getResumeToken.toJson,
doc.getFullDocument.toJson,
doc.getOperationType.getValue,
Try(doc.getUpdateDescription.getRemovedFields.asScala).getOrElse(List.empty),
Try(doc.getUpdateDescription.getUpdatedFields.toJson).getOrElse(""),
Calendar.getInstance().getTimeInMillis
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment