Skip to content

Instantly share code, notes, and snippets.

@rrevol
Created February 20, 2018 15:01
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4 to your computer and use it in GitHub Desktop.
Save rrevol/06ddfecd5f5ac7cbc67785b5d3a84dd4 to your computer and use it in GitHub Desktop.
Small test reproducing a deadlock in Flink
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
object FlinkDeadlockTest {
val MAX_LOOP_NB = 10
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(new SourceFunction[Int] {
var goOn = true
override def cancel(): Unit = {
println("cancel")
goOn = false
}
override def run(ctx: SourceFunction.SourceContext[Int]): Unit = {
Stream.from(0).takeWhile(_ => goOn).foreach(ctx.collect)
}
})
.map(x => (x, x))
.iterate((input: DataStream[(Int, Int)]) => {
val split = input
.map(_ match {
case (k, v) => (k, v + 1)
})
.split(_ match {
case (_, v) if v % MAX_LOOP_NB == 0 => List("output")
case _ => List("feedback")
})
(split.select("feedback"), split.select("output"))
})
.print()
env.execute("test")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment