Create a gist now

Instantly share code, notes, and snippets.

Play and Akka Streams source queue
class Application @Inject() (implicit actorSystem: ActorSystem, exec: ExecutionContext) extends Controller {
implicit val materializer = ActorMaterializer()
val Tick = "tick"
class TickActor(queue: SourceQueue[String]) extends Actor {
def receive = {
case Tick => queue.offer("tack")
}
}
def queueAction = Action {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
val tickActor = actorSystem.actorOf(Props(new TickActor(queue)))
val tickSchedule =
actorSystem.scheduler.schedule(0 milliseconds,
1 second,
tickActor,
Tick)
queue.watchCompletion().map{ done =>
Logger.debug("Client disconnected")
tickSchedule.cancel
Logger.debug("Scheduler canceled")
}
}
Ok.chunked(
queueSource.map{e =>
Logger.debug("queue source element : " + e)
e
}
via EventSource.flow
)
}
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment