Skip to content

Instantly share code, notes, and snippets.

@ariskk
Created February 3, 2019 20:06
Show Gist options
  • Save ariskk/c489d1ac24dcf0b94bf5a723006b8c73 to your computer and use it in GitHub Desktop.
Save ariskk/c489d1ac24dcf0b94bf5a723006b8c73 to your computer and use it in GitHub Desktop.
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.streaming.api.functions.co.{CoMapFunction, RichCoMapFunction}
import org.apache.flink.streaming.api.scala.{ConnectedStreams, DataStream}
import org.apache.flink.streaming.api.scala._
case class M1(key: String, foo: String)
case class M2(key: String, bar: String)
case class State(maybeM1: Option[M1], maybeM2: Option[M2]) {
lazy val out = for {
m1 <- maybeM1
m2 <- maybeM2
} yield Out(m1.key, m1.foo + m2.bar)
}
case class Out(key: String, foodbar: String)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val keyedS1 = env.fromElements(M1("a", "fooA"), M1("b", "fooB")).keyBy(_.key)
val keyedS2 = env.fromElements(M2("a", "barA"), M2("c", "barC")).keyBy(_.key)
lazy val stream = keyedS1.connect(keyedS2).map(
new RichCoMapFunction[M1, M2, Option[Out]] {
lazy val stateTypeInfo: TypeInformation[State] = implicitly[TypeInformation[State]]
lazy val serializer: TypeSerializer[State] = stateTypeInfo.createSerializer(getRuntimeContext.getExecutionConfig)
lazy val stateDescriptor = new ValueStateDescriptor[State]("state", serializer)
def map[I](in: I, f: (I, State) => (Option[Out], State)) = {
val state = getRuntimeContext.getState(stateDescriptor)
val (o, newState) = f(in, Option(state.value).getOrElse(State(None, None)))
state.update(newState)
o
}
override def map1(in: M1): Option[Out] = map[M1](in, (in, s) => {
val newState = s.copy(maybeM1 = Option(in))
(newState.out, newState)
})
override def map2(in: M2): Option[Out] = map[M2](in, (in, s) => {
val newState = s.copy(maybeM2 = Option(in))
(newState.out, newState)
})
}
)
val results = new DataStreamUtils(stream2).collect.toList
println(results)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment