Skip to content

Instantly share code, notes, and snippets.

@Zhen-hao
Last active March 31, 2019 19:29
Show Gist options
  • Save Zhen-hao/1a5cd65fa4037738dcc1ea782198e414 to your computer and use it in GitHub Desktop.
Save Zhen-hao/1a5cd65fa4037738dcc1ea782198e414 to your computer and use it in GitHub Desktop.
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream}
import org.apache.flink.streaming.api.scala._
import shapeless._
import record._
import shapeless.LabelledGeneric
import shapeless.labelled.FieldType
import shapeless.Witness
import shapeless.ops.record.Selector
import shapeless.syntax.RecordOps
import shapeless.syntax.singleton._
import shapeless.record._
trait DispatchTableGen[T, E] extends Serializable {
def dispatchTable[R: TypeInformation, S: TypeInformation](
fun: (E, Option[S]) => (TraversableOnce[R], Option[S])
): Map[String, DataStream[E] => DataStream[R]]
}
trait LowPriorityDispatchTableGen extends Serializable {
implicit def lowHcons[K <: Symbol, OV, V, T <: HList, E, Rpr <: HList](
implicit
lg: LabelledGeneric.Aux[E, Rpr],
wit: Witness.Aux[K],
selector: Selector.Aux[Rpr, K, OV],
ev: OV =:= V,
tpInfoV: TypeInformation[V],
pInfo: TypeInformation[(V, E)],
rest: DispatchTableGen[T, E]
) = new DispatchTableGen[FieldType[K, V] :: T, E] {
def dispatchTable[R: TypeInformation, S: TypeInformation](
fun: (E, Option[S]) => (TraversableOnce[R], Option[S])
) = {
val funTrans: ((V, E), Option[S]) => (TraversableOnce[R], Option[S]) = {
case ((v: V, e: E), state: Option[S]) => fun(e, state)
}
val newEntry: (String, DataStream[E] => DataStream[R]) = wit.value.name -> (
(stream: DataStream[E]) =>
stream
.map(
(e: E) => {
val rep = lg.to(e)
val v: V = selector(rep)
(v, e)
}
)
.keyBy((p: (V, E)) => p._1)
.flatMapWithState(funTrans)
)
rest.dispatchTable(fun) + newEntry
}
}
}
object DispatchTableGen extends LowPriorityDispatchTableGen {
implicit def hnil[E] = new DispatchTableGen[HNil, E] {
def dispatchTable[R: TypeInformation, S: TypeInformation](
fun: (E, Option[S]) => (TraversableOnce[R], Option[S])
) = Map.empty
}
implicit def hcons[K <: Symbol, OV, V, T <: HList, E, Rpr <: HList](
implicit
lg: LabelledGeneric.Aux[E, Rpr],
wit: Witness.Aux[K],
selector: Selector.Aux[Rpr, K, OV],
u: Unpack1[OV, Option, V],
ev: OV =:= Option[V],
tpInfoV: TypeInformation[V],
pInfo: TypeInformation[(V, E)],
rest: DispatchTableGen[T, E]
) = new DispatchTableGen[FieldType[K, OV] :: T, E] {
def dispatchTable[R: TypeInformation, S: TypeInformation](
fun: (E, Option[S]) => (TraversableOnce[R], Option[S])
) = {
val funTrans: ((V, E), Option[S]) => (TraversableOnce[R], Option[S]) = {
case ((v: V, e: E), state: Option[S]) => fun(e, state)
}
val newEntry: (String, DataStream[E] => DataStream[R]) = wit.value.name -> (
(stream: DataStream[E]) =>
stream
.flatMap(
(event: E) => {
val rep: Rpr = lg.to(event)
val opKeyV: OV = selector(rep)
opKeyV match {
case Some(v: V) => Some((v, event))
case None => None
}
}
)
.keyBy((p: (V, E)) => p._1)
.flatMapWithState(funTrans)
)
rest.dispatchTable(fun) + newEntry
}
}
implicit def caseClass[T, Rpr](
implicit
lg: LabelledGeneric.Aux[T, Rpr],
genTable: DispatchTableGen[Rpr, T]
) = new DispatchTableGen[T, T] {
def dispatchTable[R: TypeInformation, S: TypeInformation](
fun: (T, Option[S]) => (TraversableOnce[R], Option[S])
) = genTable.dispatchTable(fun)
}
}
trait DispatchTable[E, R] extends Serializable {
val dispatchTable: Map[String, DataStream[E] => DataStream[R]]
}
object DispatchTable {
def apply[E, R: TypeInformation, S: TypeInformation](
fun: (E, Option[S]) => (TraversableOnce[R], Option[S])
)(implicit dispt: DispatchTableGen[E, E]): DispatchTable[E, R] = new DispatchTable[E, R] {
val dispatchTable = dispt.dispatchTable(fun)
}
}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream}
import org.apache.flink.streaming.api.scala._
import shapeless._
import record._
import shapeless.LabelledGeneric
import shapeless.labelled.FieldType
import shapeless.Witness
import shapeless.ops.record.Selector
import shapeless.syntax.RecordOps
import shapeless.syntax.singleton._
import shapeless.record._
trait KeyGenerator[K, T] {
type KV
type OUT_K
def addKeys: DataStream[T] => KeyedStream[(OUT_K, T), OUT_K]
}
trait LowPriorityKeyGenerator {
type Aux[K, T, KV0, OUT_K0] = KeyGenerator[K, T] {
type KV = KV0
type OUT_K = OUT_K0
}
implicit def keyGeneratorNotOption[K, V, OV, T, Rpr <: HList](
implicit
lgen: LabelledGeneric.Aux[T, Rpr],
selector: Selector.Aux[Rpr, K, V],
ev: OV =:= V,
tpInfo: TypeInformation[(Rpr, T)],
tpInfoV: TypeInformation[V],
tpInfoT: TypeInformation[T]
): Aux[K, T, OV, V] = new KeyGenerator[K, T] {
type KV = OV
type OUT_K = V
def addKeys = (stream: DataStream[T]) => {
stream
.map(
(e: T) => {
val rep = lgen.to(e)
val v: V = selector(rep)
(v, e)
}
)
.keyBy((p: (V, T)) => p._1)
}
}
}
object KeyGenerator extends LowPriorityKeyGenerator {
// def apply[T](k: Witness)(
// implicit
// keyGenerator: KeyGenerator[k.T, T]
// ): Aux[k.T, T, keyGenerator.KV, keyGenerator.OUT_K] = keyGenerator
def apply[K,T](
implicit
wit: Witness.Aux[K],
keyGenerator: KeyGenerator[K, T]
): Aux[K, T, keyGenerator.KV, keyGenerator.OUT_K] = keyGenerator
implicit def keyGenerator[K, V, OV, T, Rpr <: HList](
implicit
lgen: LabelledGeneric.Aux[T, Rpr],
selector: Selector.Aux[Rpr, K, OV],
u: Unpack1[OV, Option, V],
ev: OV =:= Option[V],
tpInfo: TypeInformation[(Rpr, T)],
tpInfoV: TypeInformation[V],
tpInfoT: TypeInformation[T]
): Aux[K, T, OV, V] = new KeyGenerator[K, T] {
type KV = OV
type OUT_K = V
def addKeys = (stream: DataStream[T]) => {
stream
.flatMap(
(e: T) => {
val rep = lgen.to(e)
val opKeyV: OV = selector(rep)
opKeyV match {
case Some(v: V) => Some((v, e))
case None => None
}
}
)
.keyBy((p: (V, T)) => p._1)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment