Created
January 6, 2020 16:47
-
-
Save colinbes/d39a10add371c45d04600eb3e0ceb4d0 to your computer and use it in GitHub Desktop.
SSE Rest Endpoint
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.besterdesigns.rest.services | |
import akka.actor.{ActorRef, ActorSystem} | |
import akka.http.scaladsl.model.StatusCodes | |
import akka.http.scaladsl.model.sse.ServerSentEvent | |
import akka.http.scaladsl.server.Directives._ | |
import akka.http.scaladsl.server.Route | |
import akka.http.scaladsl.server.directives.MethodDirectives.{get, post} | |
import akka.http.scaladsl.server.directives.PathDirectives.path | |
import akka.http.scaladsl.server.directives.RouteDirectives.complete | |
import akka.stream.scaladsl.{BroadcastHub, Keep, Source, SourceQueueWithComplete} | |
import akka.stream.{DelayOverflowStrategy, OverflowStrategy, _} | |
import akka.util.Timeout | |
import com.besterdesigns.actors.StreamingEventSourceActor | |
import com.besterdesigns.models.Json4sFormat | |
import scala.concurrent.duration._ | |
trait BasicService extends Json4sFormat { | |
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._ | |
import com.besterdesigns.actors.StartClock | |
implicit val actorSystem: ActorSystem | |
implicit val materializer: ActorMaterializer | |
implicit lazy val ec = actorSystem.dispatcher | |
//Thanks to https://gist.github.com/claudio-scandura/a8b0011beddb6c8d41f81a551003dc9f for tip on using Source.queue | |
lazy val (sourceQueue, eventsSource) = Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure) | |
.delay(1.seconds, DelayOverflowStrategy.backpressure) | |
.map(message => ServerSentEvent(message, Some("myEvent"))) | |
.keepAlive(1.second, () => ServerSentEvent.heartbeat) | |
.toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both) | |
.run() | |
lazy val streamingActor = actorSystem.actorOf(StreamingEventSourceActor.props(sourceQueue), name = StreamingEventSourceActor.name) | |
lazy val myRoute: Route = | |
path("events") { | |
concat( | |
get { | |
complete { | |
actorSystem.scheduler.scheduleOnce(2.second) { | |
streamingActor ! StartClock | |
} | |
eventsSource | |
} | |
}, | |
put { | |
// PUT method will post even to connected clients | |
entity(as[String]) { event => | |
complete { | |
streamingActor ! StreamingEventSourceActor.UpdateDashboard(event) | |
StatusCodes.OK | |
} | |
} | |
} | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment