Skip to content

Instantly share code, notes, and snippets.

@ninadingole
Created May 25, 2018 14:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ninadingole/d9a671e004ee7373e345131430a0f6d6 to your computer and use it in GitHub Desktop.
Save ninadingole/d9a671e004ee7373e345131430a0f6d6 to your computer and use it in GitHub Desktop.
private def buildMovieStream: KStreamS[String, movie.Envelope] = {
import AppSerdes.movieSerde.consumed
builder.stream[String, movie.Envelope](Utils.getTopic("movies"))
}
private def buildMovieSalesStream = {
import AppSerdes.movieSalesSerde.consumed
builder.stream[String, Envelope](Utils.getTopic("sales"))
}
val movieStream = buildMovieStream
val saleStream = buildMovieSalesStream
private def filterSalesStreamForCreations = {
saleStream
.filter((id, value) => {
println("filtering sales creation message")
value.op.equalsIgnoreCase("c")
})
}
def createMovieBusinessEvent = {
import AppSerdes.movieBEventSerde.{joined, salesSerialized}
val movieFilteredStream = new MovieCreatedFilter().filter(movieStream)
val salesFilteredStream = filterSalesStreamForCreations
val envelopExtractedMovie: KStreamS[Int, Movie] =
movieFilteredStream.map((id, value) => (value.after.get.movie_id.get, value.after.get))
val envelopeExtractedSale = salesFilteredStream.map((id, value) => (value.after.get.movie_id.get, value.after.get))
envelopExtractedMovie.join(envelopeExtractedSale, (movie: Movie, movieSale: MovieSales) => {
println("Created Business Event")
val serializer = new KafkaAvroSerializer()
serializer.configure(schemaConfig, false)
val movieSerialized = serializer.serialize(Utils.getTopic("movie"), AppSerdes.movieBEventSerde.movieFormat.to(movie))
val salesSerialized =
serializer.serialize(Utils.getTopic("movie_sales"), AppSerdes.movieBEventSerde.saleFormat.to(movieSale))
val map = Map("movie" -> movieSerialized, "sale" -> salesSerialized)
BusinessEvent(EventTypes.`MOVIECREATEEVENT`, map)
}, JoinWindows.of(3000))
}
def emitMovieBussinessEventToTopic = {
import AppSerdes.movieBEventSerde.eventProduced
createMovieBusinessEvent.to("events")
}
emitMovieBussinessEventToTopic
def createMovieUpdateEvent = {
val updateStream = new MovieUpdateFilter().filter(movieStream)
updateStream.map((id, envelop) => {
val before = envelop.before.get
val after = envelop.after.get
val serializer = new KafkaAvroSerializer()
serializer.configure(schemaConfig, false)
val beforeMovieSerialized = serializer.serialize("events", AppSerdes.movieBEventSerde.movieFormat.to(before))
val afterMovieSerialized = serializer.serialize("events", AppSerdes.movieBEventSerde.movieFormat.to(after))
(after.movie_id.get,
BusinessEvent(EventTypes.`MOVIEUPDATEEVENT`, Map("before" -> beforeMovieSerialized, "after" -> afterMovieSerialized)))
})
}
def emitMovieUpdateEvent = {
import AppSerdes.movieBEventSerde.eventProduced
createMovieUpdateEvent.to("events")
}
emitMovieUpdateEvent
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment