Created
May 25, 2018 14:32
-
-
Save ninadingole/d9a671e004ee7373e345131430a0f6d6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private def buildMovieStream: KStreamS[String, movie.Envelope] = { | |
import AppSerdes.movieSerde.consumed | |
builder.stream[String, movie.Envelope](Utils.getTopic("movies")) | |
} | |
private def buildMovieSalesStream = { | |
import AppSerdes.movieSalesSerde.consumed | |
builder.stream[String, Envelope](Utils.getTopic("sales")) | |
} | |
val movieStream = buildMovieStream | |
val saleStream = buildMovieSalesStream | |
private def filterSalesStreamForCreations = { | |
saleStream | |
.filter((id, value) => { | |
println("filtering sales creation message") | |
value.op.equalsIgnoreCase("c") | |
}) | |
} | |
def createMovieBusinessEvent = { | |
import AppSerdes.movieBEventSerde.{joined, salesSerialized} | |
val movieFilteredStream = new MovieCreatedFilter().filter(movieStream) | |
val salesFilteredStream = filterSalesStreamForCreations | |
val envelopExtractedMovie: KStreamS[Int, Movie] = | |
movieFilteredStream.map((id, value) => (value.after.get.movie_id.get, value.after.get)) | |
val envelopeExtractedSale = salesFilteredStream.map((id, value) => (value.after.get.movie_id.get, value.after.get)) | |
envelopExtractedMovie.join(envelopeExtractedSale, (movie: Movie, movieSale: MovieSales) => { | |
println("Created Business Event") | |
val serializer = new KafkaAvroSerializer() | |
serializer.configure(schemaConfig, false) | |
val movieSerialized = serializer.serialize(Utils.getTopic("movie"), AppSerdes.movieBEventSerde.movieFormat.to(movie)) | |
val salesSerialized = | |
serializer.serialize(Utils.getTopic("movie_sales"), AppSerdes.movieBEventSerde.saleFormat.to(movieSale)) | |
val map = Map("movie" -> movieSerialized, "sale" -> salesSerialized) | |
BusinessEvent(EventTypes.`MOVIECREATEEVENT`, map) | |
}, JoinWindows.of(3000)) | |
} | |
def emitMovieBussinessEventToTopic = { | |
import AppSerdes.movieBEventSerde.eventProduced | |
createMovieBusinessEvent.to("events") | |
} | |
emitMovieBussinessEventToTopic | |
def createMovieUpdateEvent = { | |
val updateStream = new MovieUpdateFilter().filter(movieStream) | |
updateStream.map((id, envelop) => { | |
val before = envelop.before.get | |
val after = envelop.after.get | |
val serializer = new KafkaAvroSerializer() | |
serializer.configure(schemaConfig, false) | |
val beforeMovieSerialized = serializer.serialize("events", AppSerdes.movieBEventSerde.movieFormat.to(before)) | |
val afterMovieSerialized = serializer.serialize("events", AppSerdes.movieBEventSerde.movieFormat.to(after)) | |
(after.movie_id.get, | |
BusinessEvent(EventTypes.`MOVIEUPDATEEVENT`, Map("before" -> beforeMovieSerialized, "after" -> afterMovieSerialized))) | |
}) | |
} | |
def emitMovieUpdateEvent = { | |
import AppSerdes.movieBEventSerde.eventProduced | |
createMovieUpdateEvent.to("events") | |
} | |
emitMovieUpdateEvent |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment