Play and Akka Streams source queue
class Application @Inject() (implicit actorSystem: ActorSystem, exec: ExecutionContext) extends Controller { | |
implicit val materializer = ActorMaterializer() | |
val Tick = "tick" | |
class TickActor(queue: SourceQueue[String]) extends Actor { | |
def receive = { | |
case Tick => queue.offer("tack") | |
} | |
} | |
def queueAction = Action { | |
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail)) | |
futureQueue.map { queue => | |
val tickActor = actorSystem.actorOf(Props(new TickActor(queue))) | |
val tickSchedule = | |
actorSystem.scheduler.schedule(0 milliseconds, | |
1 second, | |
tickActor, | |
Tick) | |
queue.watchCompletion().map{ done => | |
Logger.debug("Client disconnected") | |
tickSchedule.cancel | |
Logger.debug("Scheduler canceled") | |
} | |
} | |
Ok.chunked( | |
queueSource.map{e => | |
Logger.debug("queue source element : " + e) | |
e | |
} | |
via EventSource.flow | |
) | |
} | |
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = { | |
val p = Promise[M] | |
val s = src.mapMaterializedValue { m => | |
p.trySuccess(m) | |
m | |
} | |
(s, p.future) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
Thank you so much!!