Skip to content

Instantly share code, notes, and snippets.

@alexeevg
Created March 16, 2017 17:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexeevg/8feb42e0297a1265feca385efbdbcbda to your computer and use it in GitHub Desktop.
Save alexeevg/8feb42e0297a1265feca385efbdbcbda to your computer and use it in GitHub Desktop.
object StreamDeadlock extends App {
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Await
implicit val system = ActorSystem()
val noFusingMaterializer = ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
val fusingMaterializer = ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(true))
val source = Source(1 to 20)
val thisCanDeadlockSometimes = Flow[Int].map(_ + 1).mapConcat(x => List(x))
val printFlow = Flow[Int].map(println)
val sink = Flow[Int].via(thisCanDeadlockSometimes).via(printFlow).toMat(Sink.ignore)(Keep.right)
println("Running inlined stream without fusing: OK")
Await.result(source.via(thisCanDeadlockSometimes).via(printFlow).runWith(Sink.ignore)(noFusingMaterializer), Duration.Inf)
println("Running composite stream with fusing: OK")
Await.result(source.runWith(sink)(fusingMaterializer), Duration.Inf)
println("Running composite stream without fusing: DEADLOCK")
Await.result(source.runWith(sink)(noFusingMaterializer), Duration.Inf)
system.terminate()
}
@alexeevg
Copy link
Author

Johan Andrén suggested to look at assertAllStagesStopped debug output:

// this object is not exported by akka-stream-testkit, copy-pasted the relevant parts
package akka {

  import akka.actor.ActorRef
  import akka.stream.Materializer
  import akka.stream.impl.ActorMaterializerImpl
  import scala.concurrent.duration._

  object Utils {
    def assertAllStagesStopped[T](block:  T)(implicit materializer: Materializer): T =
      materializer match {
        case impl: ActorMaterializerImpl 
          val probe = TestProbe()(impl.system)
          probe.send(impl.supervisor, StreamSupervisor.StopChildren)
          probe.expectMsg(StreamSupervisor.StoppedChildren)
          val result = block
          probe.within(5.seconds) {
            var children = Set.empty[ActorRef]
            try probe.awaitAssert {
              impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref)
              children = probe.expectMsgType[StreamSupervisor.Children].children
              assert(
                children.isEmpty,
                s"expected no StreamSupervisor children, but got [${children.mkString(", ")}]")
            }
            catch {
              case ex: Throwable 
                children.foreach(_ ! StreamSupervisor.PrintDebugDump)
                throw ex
            }
          }
          result
        case _  block
      }

  }
}
...
akka.Utils.assertAllStagesStopped(source.runWith(sink)(noFusingMaterializer))(noFusingMaterializer)

Output:

activeShells (actor: Actor[akka://default/user/StreamSupervisor-0/flow-0-1-iterableSource-statefulMapConcat#666478310]):
  GraphInterpreterShell
    GraphAssembly
      [ GraphStage(StatefulMapConcat) [3246fb96]    []
      ]
      [StatefulMapConcat.in,null]
      [0,-1]
      [null,StatefulMapConcat.out]
      [-1,0]digraph waits {
N0 [label="StatefulMapConcat"]

  Out1 -> N0 [label=shouldPull; color=blue]
}
// (2, 18, 18)() (running=1, shutdown=1)newShells:
activeShells (actor: Actor[akka://default/user/StreamSupervisor-0/flow-0-2-map#-1206207493]):
  GraphInterpreterShell
    GraphAssembly
      [ GraphStage(Map(StreamDeadlock$$$Lambda$158/1010670443@1c0fc129)) [3e6ef8ad]    []
      ]
      [Map.in,null]
      [0,-1]
      [null,Map.out]
      [-1,0]digraph waits {
N0 [label="Map(StreamDeadlock$$$Lambda$158/1010670443@1c0fc129)"]
  N0 -> In0 [label=shouldPull; color=blue]
  Out1 -> N0 [label=shouldPull; color=blue]
}
// (2, 0, 0)() (running=1, shutdown=2)newShells:
activeShells (actor: Actor[akka://default/user/StreamSupervisor-0/flow-0-3-statefulMapConcat#1266396670]):
  GraphInterpreterShell
    GraphAssembly
      [ GraphStage(StatefulMapConcat) [3d1cfad4]    []
      ]
      [StatefulMapConcat.in,null]
      [0,-1]
      [null,StatefulMapConcat.out]
      [-1,0]digraph waits {
N0 [label="StatefulMapConcat"]
  In0 -> N0 [label=shouldPush; color=red];
  N0 -> Out1 [label=shouldPush; color=red];
}
// (2, 1, 1)() (running=1, shutdown=2)newShells:
activeShells (actor: Actor[akka://default/user/StreamSupervisor-0/flow-0-4-map#1133572581]):
  GraphInterpreterShell
    GraphAssembly
      [ GraphStage(Map(StreamDeadlock$$$Lambda$160/961409111@4901a20e)) [0c333c60]    []
      ]
      [Map.in,null]
      [0,-1]
      [null,Map.out]
      [-1,0]digraph waits {
N0 [label="Map(StreamDeadlock$$$Lambda$160/961409111@4901a20e)"]
  In0 -> N0 [label=shouldPush; color=red];
  N0 -> Out1 [label=shouldPush; color=red];
}
// (2, 1, 1)() (running=1, shutdown=2)newShells:
activeShells (actor: Actor[akka://default/user/StreamSupervisor-0/flow-0-5-ignoreSink#-650629835]):
  GraphInterpreterShell
    GraphAssembly
      [ GraphStage(akka.stream.impl.fusing.GraphStages$IgnoreSink$@24975716) [2f8dad04]    []
      ]
      [Ignore.in]
      [0]
      [null]
      [-1]digraph waits {
N0 [label="akka.stream.impl.fusing.GraphStages$IgnoreSink$@24975716"]
  In0 -> N0 [label=shouldPush; color=red];
}
// (1, 1, 1)() (running=1, shutdown=1)newShells:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment