Skip to content

Instantly share code, notes, and snippets.

@asardaes
Created June 3, 2019 12:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save asardaes/65f3dee3d1e3103b4c82d4309d071592 to your computer and use it in GitHub Desktop.
Save asardaes/65f3dee3d1e3103b4c82d4309d071592 to your computer and use it in GitHub Desktop.
Flink dynamic Key Selector
data class WC(var word: String, var count: Int) : Serializable {
constructor() : this("", 0)
}
class KS : KeySelector<WC, Int> {
private var count = 0
override fun getKey(value: WC): Int {
println("count=$count, value=$value")
return if (count++ < 2) 0 else 1
}
}
class Reducer : ReduceFunction<WC> {
override fun reduce(value1: WC, value2: WC): WC {
println("Got $value2")
return value2.copy(count = value1.count + value2.count)
}
}
fun main() {
val values = listOf(
WC("a", 10),
WC("a", 10),
WC("b", 1),
WC("b", 2)
)
val env = StreamExecutionEnvironment.getExecutionEnvironment()
env.parallelism = 1
val ans = env.fromCollection(values)
.keyBy(KS())
.reduce(Reducer())
ans.print()
env.execute()
}
count=0, value=WC(word=a, count=10)
count=1, value=WC(word=a, count=10)
count=2, value=WC(word=b, count=1)
count=3, value=WC(word=b, count=2)
count=0, value=WC(word=a, count=10)
WC(word=a, count=10)
count=1, value=WC(word=a, count=10)
Got WC(word=a, count=10)
WC(word=a, count=20)
count=2, value=WC(word=b, count=1)
WC(word=b, count=1)
count=3, value=WC(word=b, count=2)
Got WC(word=b, count=2)
WC(word=b, count=3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment