Skip to content

Instantly share code, notes, and snippets.

@tsabirgaliev
Last active March 25, 2017 06:52
Show Gist options
  • Save tsabirgaliev/4168639e991bd13e364c54def5c6d2cd to your computer and use it in GitHub Desktop.
Save tsabirgaliev/4168639e991bd13e364c54def5c6d2cd to your computer and use it in GitHub Desktop.
package custom
import akka.actor._
import akka.stream.{ Materializer, OverflowStrategy }
import akka.stream.scaladsl.{ Sink, Keep, Source, Flow }
object CustomActorFlow {
def actorRef[In, Out](props: ActorRef => Props, bufferSize: Int = 16, overflowStrategy: OverflowStrategy = OverflowStrategy.dropNew)(implicit factory: ActorRefFactory, mat: Materializer): (Flow[In, Out, _], ActorRef) = {
val (outActor, publisher) = Source.actorRef[Out](bufferSize, overflowStrategy)
.toMat(Sink.asPublisher(false))(Keep.both).run()
val sinkActor = factory.actorOf(Props(new Actor {
val flowActor = context.watch(context.actorOf(props(outActor), "flowActor"))
def receive = {
case Status.Success(_) | Status.Failure(_) => flowActor ! PoisonPill
case Terminated(_) => context.stop(self)
case other => flowActor ! other // <<-- whatever you send to sinkActor is forwarded to your actor, except above
}
override def supervisorStrategy = OneForOneStrategy() {
case _ => SupervisorStrategy.Stop
}
}))
(Flow.fromSinkAndSource(
Sink.actorRef(sinkActor, Status.Success(())),
Source.fromPublisher(publisher)
), sinkActor) // <<-- return as a tuple
}
}
// Here is how you use it
val (flow, underlyingActor) = CustomActorFlow.actorRef { out => MyWebSocketActor.props(out) }
underlyingActor ! "Whatever"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment