Skip to content

Instantly share code, notes, and snippets.

@rionmonster
Created January 29, 2020 05:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rionmonster/8a216817c101e5cd1eb55572eaddfb30 to your computer and use it in GitHub Desktop.
Save rionmonster/8a216817c101e5cd1eb55572eaddfb30 to your computer and use it in GitHub Desktop.
Users Preprocessing
object EventProcessing {
@JvmStatic
fun main(args: Array<String>) {
val streamsManager = StreamsManager()
// Get the events, we need those to enrich with
val eventProcessor = EventProcessor()
// The raw events that originated everything
val events = streamsManager.createKStream<RawEvent>(Topics.RAW_EVENTS)
// Get the arrays of enriched entities from the pipeline
val users = streamsManager.createArrayKTable<ProcessingUser>(Topics.PROCESSING_USERS)
val ips = streamsManager.createArrayKTable<ProcessingIP>(Topics.PROCESSING_IPS)
// Append all of the enrichments that were created through the pipeline to the raw event
val finalEvents = events
.leftJoin(users, ValueJoiner<FinalEvent, ArrayList<ProcessingUser>, FinalEvent>(eventProcessor::enrichWithUsers))
// This filter feels like a hack to prevent incomplete joins from going through as we want everything to be done
// before proceeding to final
.filter { _, event -> event != null }
.leftJoin(ips, ValueJoiner<FinalEvent, ArrayList<ProcessingIP>, FinalEvent>(eventProcessor::enrichWithIps))
.filter { _, event -> event != null }
// If it has made it through all of the enrichments, then push it to the final topic
// Preferably we would only want to hit this once (if possible)
finalEvents
.filter { key, event -> key != null && event != null }
.map { _, event -> processor.mapToFinalEventAndRekey(event) }
.to(Topics.FINAL_EVENTS, Produced.with(Serdes.String, Serdes.getSerde<FinalEvent>()))
streamsManager.execute()
}
}
object UserPreprocessing {
@JvmStatic
fun main(args: Array<String>) {
val streamsManager = StreamsManager()
// Contains business logic related to identifying / extracting user(s)
val preprocessor = UserPreprocessor()
// This will map an event to 0-N users and returns a KStream<String, ArrayList<ProcessingUser>>
val users = streamsManager
.createKStream<RawEvent>(Topics.RAW_EVENTS)
.mapValues(preprocessor::extractUsers)
// Send these to the preprocessing_users topic (which is picked up and will handle enrichment)
users
.filter { _, value -> value.isNotEmpty() }
.to(Topics.PREPROCESSING_USERS, Produced.with(Serdes.String, Serdes.getArraySerde<ProcessingUser>()))
// No users were identified, so send to the processing topic for event enrichment
users
.filter { _, value -> value.isEmpty() }
.to(Topics.PROCESSING_USERS, Produced.with(Serdes.String, Serdes.getArraySerde<ProcessingUser>()))
streamsManager.execute()
}
}
object UserProcessing {
@JvmStatic
fun main(args: Array<String>) {
val streamsManager = StreamsManager()
// This is a GlobalKTable with all of the users that handles enrichment / new user creation
val users = streamsManager.createGlobalKTable<User>(Topics.FINAL_USERS)
// This is the stream of user(s) "candidates" that came from the raw event, which need to be created/enriched
val processingUsers = streamsManager.createArrayKStream<ProcessingUser>(Topics.PREPROCESSING_USERS)
// Contains business logic related to identifying / extracting user(s)
val processor = UserProcessor()
// Break apart the users and enrich each individually
val enrichedUsers = processingUsers
.flatMapValues{ _, user -> user }
.leftJoin(
users,
KeyValueMapper { _, user -> processor.extractKey(user.user) },
ValueJoiner<ProcessingUser, User, ProcessingUser>(processor::enrichWithExistingUser)
)
// Aggregate the enriched users back into a single record and send that to the processing_users topic for event enrichment
enrichedUsers
.groupByKey(Grouped.with(Serdes.String, Serdes.getSerde<ProcessingUser>()))
.aggregate(
{ ArrayList() },
{ _, user, aggregatedUsers ->
if (!aggregatedUsers.contains(user)){
aggregatedUsers.add(user)
}
aggregatedUsers
},
streamsManager.materializeTo("enriched_users", Serdes.getArraySerde<ProcessingUser>())
)
.toStream()
.to(Topics.PROCESSING_USERS, Produced.with(Serdes.String, Serdes.getArraySerde<ProcessingUser>()))
// Send the latest version of each enriched entity to the final_users topic as it will need to update the global
// k table and the topic so that any associated sink connectors pick up the changes
enrichedUsers
.map { _, value -> processor.mapToUserAndRekey(value) }
.filter { _, user -> user != null }
.to(Topics.FINAL_USERS, Produced.with(Serdes.String, Serdes.getSerde<User>()))
streamsManager.execute()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment