Skip to content

Instantly share code, notes, and snippets.

@mohdizzy
Created April 15, 2021 11:41
Sample RichCoFlatMap with state
package com.booking.test.functions
import com.booking.test.models.{BookingCreated, BookingEvent, FilteredEvent}
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
import org.apache.flink.util.Collector
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.streaming.api.scala._
import org.apache.logging.log4j.scala.Logging
import org.apache.logging.log4j.Level
class FilterFunction() extends RichCoFlatMapFunction[BookingCreated,BookingEvent,FilteredEvent] with Logging {
lazy val bookingCreateState: ValueState[BookingCreated] = getRuntimeContext.getState(new ValueStateDescriptor[BookingCreated]("booking creation", classOf[BookingCreated]))
lazy val count: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("event count", classOf[Int]))
lazy val bookingEventState: ValueState[BookingEvent] = getRuntimeContext.getState(new ValueStateDescriptor[BookingEvent]("booking event", classOf[BookingEvent]))
override def flatMap1(value: BookingCreated, out: Collector[FilteredEvent]): Unit = {
try {
val bookingCreationEvent = bookingCreateState.value
if (bookingEvent == null) {
bookingCreateState.update(value)
}
logger.info("bookingCreationState updated value"+bookingCreateState.value)
val eventCount = count.value
val bookingEvent = bookingEventState.value
if (value.travelerCount == eventCount) {
count.clear()
bookingEventState.clear()
bookingCreationEvent.clear()
out.collect(FilteredEvent(value.pnr,value.membershipnumber))
}
}catch{
case ex: Throwable =>{
logger.info("filter func booking"+ex.getMessage())
}
}
}
override def flatMap2(value: BookingEvent, out: Collector[FilteredEvent]): Unit = {
try {
var eventCount = if (count.value != null){
count.value
} else {
0
}
logger.info("event counter"+eventCount)
eventCount += 1
var bookingCreationEvent = bookingCreateState.value
var bookingEventPassengerCount = if (bookingCreationEvent != null) bookingCreationEvent.travelerCount else 0
logger.info("bookingEventPassengerCount"+bookingEventPassengerCount)
if (bookingEventState.value == null) bookingEventState.update(value)
if (bookingEventPassengerCount == eventCount) {
count.clear()
bookingEventState.clear()
bookingCreationEvent.clear()
out.collect(FilteredEvent(bookingCreationEvent.pnr,bookingCreationEvent.membershipnumber))
} else {
count.update(eventCount)
logger.info("event count updated value"+eventCount.value)
}
}catch {
case ex: Throwable =>{
logger.info("filter function"+ex.getMessage())
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment