Created
March 16, 2017 17:20
-
-
Save alexeevg/8feb42e0297a1265feca385efbdbcbda to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object StreamDeadlock extends App { | |
import akka.actor._ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import scala.concurrent.Await | |
implicit val system = ActorSystem() | |
val noFusingMaterializer = ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false)) | |
val fusingMaterializer = ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(true)) | |
val source = Source(1 to 20) | |
val thisCanDeadlockSometimes = Flow[Int].map(_ + 1).mapConcat(x => List(x)) | |
val printFlow = Flow[Int].map(println) | |
val sink = Flow[Int].via(thisCanDeadlockSometimes).via(printFlow).toMat(Sink.ignore)(Keep.right) | |
println("Running inlined stream without fusing: OK") | |
Await.result(source.via(thisCanDeadlockSometimes).via(printFlow).runWith(Sink.ignore)(noFusingMaterializer), Duration.Inf) | |
println("Running composite stream with fusing: OK") | |
Await.result(source.runWith(sink)(fusingMaterializer), Duration.Inf) | |
println("Running composite stream without fusing: DEADLOCK") | |
Await.result(source.runWith(sink)(noFusingMaterializer), Duration.Inf) | |
system.terminate() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Johan Andrén suggested to look at assertAllStagesStopped debug output:
Output: