Forked from Zhen-hao/DispatchTableForFlinkStreaming.scala
Created
March 31, 2019 19:29
-
-
Save dadepo/171536105cad96b9734cec558ea586ec to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.typeinfo.TypeInformation | |
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream} | |
import org.apache.flink.streaming.api.scala._ | |
import shapeless._ | |
import record._ | |
import shapeless.LabelledGeneric | |
import shapeless.labelled.FieldType | |
import shapeless.Witness | |
import shapeless.ops.record.Selector | |
import shapeless.syntax.RecordOps | |
import shapeless.syntax.singleton._ | |
import shapeless.record._ | |
trait DispatchTableGen[T, E] extends Serializable { | |
def dispatchTable[R: TypeInformation, S: TypeInformation]( | |
fun: (E, Option[S]) => (TraversableOnce[R], Option[S]) | |
): Map[String, DataStream[E] => DataStream[R]] | |
} | |
trait LowPriorityDispatchTableGen extends Serializable { | |
implicit def lowHcons[K <: Symbol, OV, V, T <: HList, E, Rpr <: HList]( | |
implicit | |
lg: LabelledGeneric.Aux[E, Rpr], | |
wit: Witness.Aux[K], | |
selector: Selector.Aux[Rpr, K, OV], | |
ev: OV =:= V, | |
tpInfoV: TypeInformation[V], | |
pInfo: TypeInformation[(V, E)], | |
rest: DispatchTableGen[T, E] | |
) = new DispatchTableGen[FieldType[K, V] :: T, E] { | |
def dispatchTable[R: TypeInformation, S: TypeInformation]( | |
fun: (E, Option[S]) => (TraversableOnce[R], Option[S]) | |
) = { | |
val funTrans: ((V, E), Option[S]) => (TraversableOnce[R], Option[S]) = { | |
case ((v: V, e: E), state: Option[S]) => fun(e, state) | |
} | |
val newEntry: (String, DataStream[E] => DataStream[R]) = wit.value.name -> ( | |
(stream: DataStream[E]) => | |
stream | |
.map( | |
(e: E) => { | |
val rep = lg.to(e) | |
val v: V = selector(rep) | |
(v, e) | |
} | |
) | |
.keyBy((p: (V, E)) => p._1) | |
.flatMapWithState(funTrans) | |
) | |
rest.dispatchTable(fun) + newEntry | |
} | |
} | |
} | |
object DispatchTableGen extends LowPriorityDispatchTableGen { | |
implicit def hnil[E] = new DispatchTableGen[HNil, E] { | |
def dispatchTable[R: TypeInformation, S: TypeInformation]( | |
fun: (E, Option[S]) => (TraversableOnce[R], Option[S]) | |
) = Map.empty | |
} | |
implicit def hcons[K <: Symbol, OV, V, T <: HList, E, Rpr <: HList]( | |
implicit | |
lg: LabelledGeneric.Aux[E, Rpr], | |
wit: Witness.Aux[K], | |
selector: Selector.Aux[Rpr, K, OV], | |
u: Unpack1[OV, Option, V], | |
ev: OV =:= Option[V], | |
tpInfoV: TypeInformation[V], | |
pInfo: TypeInformation[(V, E)], | |
rest: DispatchTableGen[T, E] | |
) = new DispatchTableGen[FieldType[K, OV] :: T, E] { | |
def dispatchTable[R: TypeInformation, S: TypeInformation]( | |
fun: (E, Option[S]) => (TraversableOnce[R], Option[S]) | |
) = { | |
val funTrans: ((V, E), Option[S]) => (TraversableOnce[R], Option[S]) = { | |
case ((v: V, e: E), state: Option[S]) => fun(e, state) | |
} | |
val newEntry: (String, DataStream[E] => DataStream[R]) = wit.value.name -> ( | |
(stream: DataStream[E]) => | |
stream | |
.flatMap( | |
(event: E) => { | |
val rep: Rpr = lg.to(event) | |
val opKeyV: OV = selector(rep) | |
opKeyV match { | |
case Some(v: V) => Some((v, event)) | |
case None => None | |
} | |
} | |
) | |
.keyBy((p: (V, E)) => p._1) | |
.flatMapWithState(funTrans) | |
) | |
rest.dispatchTable(fun) + newEntry | |
} | |
} | |
implicit def caseClass[T, Rpr]( | |
implicit | |
lg: LabelledGeneric.Aux[T, Rpr], | |
genTable: DispatchTableGen[Rpr, T] | |
) = new DispatchTableGen[T, T] { | |
def dispatchTable[R: TypeInformation, S: TypeInformation]( | |
fun: (T, Option[S]) => (TraversableOnce[R], Option[S]) | |
) = genTable.dispatchTable(fun) | |
} | |
} | |
trait DispatchTable[E, R] extends Serializable { | |
val dispatchTable: Map[String, DataStream[E] => DataStream[R]] | |
} | |
object DispatchTable { | |
def apply[E, R: TypeInformation, S: TypeInformation]( | |
fun: (E, Option[S]) => (TraversableOnce[R], Option[S]) | |
)(implicit dispt: DispatchTableGen[E, E]): DispatchTable[E, R] = new DispatchTable[E, R] { | |
val dispatchTable = dispt.dispatchTable(fun) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.typeinfo.TypeInformation | |
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream} | |
import org.apache.flink.streaming.api.scala._ | |
import shapeless._ | |
import record._ | |
import shapeless.LabelledGeneric | |
import shapeless.labelled.FieldType | |
import shapeless.Witness | |
import shapeless.ops.record.Selector | |
import shapeless.syntax.RecordOps | |
import shapeless.syntax.singleton._ | |
import shapeless.record._ | |
trait KeyGenerator[K, T] { | |
type KV | |
type OUT_K | |
def addKeys: DataStream[T] => KeyedStream[(OUT_K, T), OUT_K] | |
} | |
trait LowPriorityKeyGenerator { | |
type Aux[K, T, KV0, OUT_K0] = KeyGenerator[K, T] { | |
type KV = KV0 | |
type OUT_K = OUT_K0 | |
} | |
implicit def keyGeneratorNotOption[K, V, OV, T, Rpr <: HList]( | |
implicit | |
lgen: LabelledGeneric.Aux[T, Rpr], | |
selector: Selector.Aux[Rpr, K, V], | |
ev: OV =:= V, | |
tpInfo: TypeInformation[(Rpr, T)], | |
tpInfoV: TypeInformation[V], | |
tpInfoT: TypeInformation[T] | |
): Aux[K, T, OV, V] = new KeyGenerator[K, T] { | |
type KV = OV | |
type OUT_K = V | |
def addKeys = (stream: DataStream[T]) => { | |
stream | |
.map( | |
(e: T) => { | |
val rep = lgen.to(e) | |
val v: V = selector(rep) | |
(v, e) | |
} | |
) | |
.keyBy((p: (V, T)) => p._1) | |
} | |
} | |
} | |
object KeyGenerator extends LowPriorityKeyGenerator { | |
// def apply[T](k: Witness)( | |
// implicit | |
// keyGenerator: KeyGenerator[k.T, T] | |
// ): Aux[k.T, T, keyGenerator.KV, keyGenerator.OUT_K] = keyGenerator | |
def apply[K,T]( | |
implicit | |
wit: Witness.Aux[K], | |
keyGenerator: KeyGenerator[K, T] | |
): Aux[K, T, keyGenerator.KV, keyGenerator.OUT_K] = keyGenerator | |
implicit def keyGenerator[K, V, OV, T, Rpr <: HList]( | |
implicit | |
lgen: LabelledGeneric.Aux[T, Rpr], | |
selector: Selector.Aux[Rpr, K, OV], | |
u: Unpack1[OV, Option, V], | |
ev: OV =:= Option[V], | |
tpInfo: TypeInformation[(Rpr, T)], | |
tpInfoV: TypeInformation[V], | |
tpInfoT: TypeInformation[T] | |
): Aux[K, T, OV, V] = new KeyGenerator[K, T] { | |
type KV = OV | |
type OUT_K = V | |
def addKeys = (stream: DataStream[T]) => { | |
stream | |
.flatMap( | |
(e: T) => { | |
val rep = lgen.to(e) | |
val opKeyV: OV = selector(rep) | |
opKeyV match { | |
case Some(v: V) => Some((v, e)) | |
case None => None | |
} | |
} | |
) | |
.keyBy((p: (V, T)) => p._1) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment