Skip to content

Instantly share code, notes, and snippets.

@loicdescotte
Last active September 17, 2019 12:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save loicdescotte/3914f3fd6513cb85ea1638b60b444f9d to your computer and use it in GitHub Desktop.
Save loicdescotte/3914f3fd6513cb85ea1638b60b444f9d to your computer and use it in GitHub Desktop.
Play and Akka Streams source queue
class Application @Inject() (implicit actorSystem: ActorSystem, exec: ExecutionContext) extends Controller {
implicit val materializer = ActorMaterializer()
val Tick = "tick"
class TickActor(queue: SourceQueue[String]) extends Actor {
def receive = {
case Tick => queue.offer("tack")
}
}
def queueAction = Action {
val (queueSource, futureQueue) = peekMatValue(Source.queue[String](10, OverflowStrategy.fail))
futureQueue.map { queue =>
val tickActor = actorSystem.actorOf(Props(new TickActor(queue)))
val tickSchedule =
actorSystem.scheduler.schedule(0 milliseconds,
1 second,
tickActor,
Tick)
queue.watchCompletion().map{ done =>
Logger.debug("Client disconnected")
tickSchedule.cancel
Logger.debug("Scheduler canceled")
}
}
Ok.chunked(
queueSource.map{e =>
Logger.debug("queue source element : " + e)
e
}
via EventSource.flow
)
}
def peekMatValue[T, M](src: Source[T, M]): (Source[T, M], Future[M]) = {
val p = Promise[M]
val s = src.mapMaterializedValue { m =>
p.trySuccess(m)
m
}
(s, p.future)
}
}
@Manc
Copy link

Manc commented Jul 23, 2018

Thank you so much!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment