Created
June 3, 2019 12:14
-
-
Save asardaes/65f3dee3d1e3103b4c82d4309d071592 to your computer and use it in GitHub Desktop.
Flink dynamic Key Selector
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
data class WC(var word: String, var count: Int) : Serializable { | |
constructor() : this("", 0) | |
} | |
class KS : KeySelector<WC, Int> { | |
private var count = 0 | |
override fun getKey(value: WC): Int { | |
println("count=$count, value=$value") | |
return if (count++ < 2) 0 else 1 | |
} | |
} | |
class Reducer : ReduceFunction<WC> { | |
override fun reduce(value1: WC, value2: WC): WC { | |
println("Got $value2") | |
return value2.copy(count = value1.count + value2.count) | |
} | |
} | |
fun main() { | |
val values = listOf( | |
WC("a", 10), | |
WC("a", 10), | |
WC("b", 1), | |
WC("b", 2) | |
) | |
val env = StreamExecutionEnvironment.getExecutionEnvironment() | |
env.parallelism = 1 | |
val ans = env.fromCollection(values) | |
.keyBy(KS()) | |
.reduce(Reducer()) | |
ans.print() | |
env.execute() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
count=0, value=WC(word=a, count=10) | |
count=1, value=WC(word=a, count=10) | |
count=2, value=WC(word=b, count=1) | |
count=3, value=WC(word=b, count=2) | |
count=0, value=WC(word=a, count=10) | |
WC(word=a, count=10) | |
count=1, value=WC(word=a, count=10) | |
Got WC(word=a, count=10) | |
WC(word=a, count=20) | |
count=2, value=WC(word=b, count=1) | |
WC(word=b, count=1) | |
count=3, value=WC(word=b, count=2) | |
Got WC(word=b, count=2) | |
WC(word=b, count=3) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment