/FilterFunction.scala Secret
Created
April 15, 2021 11:41
Sample RichCoFlatMap with state
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.booking.test.functions | |
import com.booking.test.models.{BookingCreated, BookingEvent, FilteredEvent} | |
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction | |
import org.apache.flink.util.Collector | |
import org.apache.flink.configuration.Configuration | |
import org.apache.flink.api.common.state.ValueStateDescriptor | |
import org.apache.flink.api.common.state.ValueState | |
import org.apache.flink.streaming.api.scala._ | |
import org.apache.logging.log4j.scala.Logging | |
import org.apache.logging.log4j.Level | |
class FilterFunction() extends RichCoFlatMapFunction[BookingCreated,BookingEvent,FilteredEvent] with Logging { | |
lazy val bookingCreateState: ValueState[BookingCreated] = getRuntimeContext.getState(new ValueStateDescriptor[BookingCreated]("booking creation", classOf[BookingCreated])) | |
lazy val count: ValueState[Int] = getRuntimeContext.getState(new ValueStateDescriptor[Int]("event count", classOf[Int])) | |
lazy val bookingEventState: ValueState[BookingEvent] = getRuntimeContext.getState(new ValueStateDescriptor[BookingEvent]("booking event", classOf[BookingEvent])) | |
override def flatMap1(value: BookingCreated, out: Collector[FilteredEvent]): Unit = { | |
try { | |
val bookingCreationEvent = bookingCreateState.value | |
if (bookingEvent == null) { | |
bookingCreateState.update(value) | |
} | |
logger.info("bookingCreationState updated value"+bookingCreateState.value) | |
val eventCount = count.value | |
val bookingEvent = bookingEventState.value | |
if (value.travelerCount == eventCount) { | |
count.clear() | |
bookingEventState.clear() | |
bookingCreationEvent.clear() | |
out.collect(FilteredEvent(value.pnr,value.membershipnumber)) | |
} | |
}catch{ | |
case ex: Throwable =>{ | |
logger.info("filter func booking"+ex.getMessage()) | |
} | |
} | |
} | |
override def flatMap2(value: BookingEvent, out: Collector[FilteredEvent]): Unit = { | |
try { | |
var eventCount = if (count.value != null){ | |
count.value | |
} else { | |
0 | |
} | |
logger.info("event counter"+eventCount) | |
eventCount += 1 | |
var bookingCreationEvent = bookingCreateState.value | |
var bookingEventPassengerCount = if (bookingCreationEvent != null) bookingCreationEvent.travelerCount else 0 | |
logger.info("bookingEventPassengerCount"+bookingEventPassengerCount) | |
if (bookingEventState.value == null) bookingEventState.update(value) | |
if (bookingEventPassengerCount == eventCount) { | |
count.clear() | |
bookingEventState.clear() | |
bookingCreationEvent.clear() | |
out.collect(FilteredEvent(bookingCreationEvent.pnr,bookingCreationEvent.membershipnumber)) | |
} else { | |
count.update(eventCount) | |
logger.info("event count updated value"+eventCount.value) | |
} | |
}catch { | |
case ex: Throwable =>{ | |
logger.info("filter function"+ex.getMessage()) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment