Skip to content

Instantly share code, notes, and snippets.

@ninadingole
Created May 25, 2018 14:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ninadingole/e1d5f603f326d767e460041190ac2514 to your computer and use it in GitHub Desktop.
Save ninadingole/e1d5f603f326d767e460041190ac2514 to your computer and use it in GitHub Desktop.
def buildEventStream = {
import AppSerdes.movieBEventSerde.eventConsumed
builder.stream[Int, BusinessEvent]("events")
}
private val eventStreams: KStreamS[Int, BusinessEvent] = buildEventStream
def filterEventsByType(eventType: String): KStreamS[Int, BusinessEvent] = {
eventStreams.filter((_: Int, event: BusinessEvent) => event.eventType.equalsIgnoreCase(eventType))
}
filterEventsByType(EventTypes.`MOVIECREATEEVENT`).foreach((id, event) => {
val value = new MovieCreatedEventDeserializer(event).get
if (value.isDefined) {
val doc = value.get
movies.insertOne(document = doc).toFuture().onComplete(_ => println(s"Inserted ${doc}"))
}
})
filterEventsByType(EventTypes.`MOVIEUPDATEEVENT`).foreach((id, event) => {
val movie: Option[Document] = new MovieUpdateEventDeserializer(event).get
val movieDocument = movies.find(Filters.eq("movie_id", movie.get.getInteger("movie_id")))
movieDocument.toFuture().map(_.head).onSuccess {
case data => {
val document = movie.get.toBsonDocument
println(s"Relpacing Movie Information ${movie.get.get("movie_id").get.asString()}")
document.put("sales", data.get("sales").get)
movies
.replaceOne(Filters.eq("_id", data.getObjectId("_id")), document)
.toFuture()
.onSuccess {
case data => println("Movie Information Updated")
}
}
}
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment