Skip to content

Instantly share code, notes, and snippets.

@colinbes
Created January 6, 2020 16:47
Show Gist options
  • Save colinbes/d39a10add371c45d04600eb3e0ceb4d0 to your computer and use it in GitHub Desktop.
Save colinbes/d39a10add371c45d04600eb3e0ceb4d0 to your computer and use it in GitHub Desktop.
SSE Rest Endpoint
package com.besterdesigns.rest.services
import akka.actor.{ActorRef, ActorSystem}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.http.scaladsl.server.directives.MethodDirectives.{get, post}
import akka.http.scaladsl.server.directives.PathDirectives.path
import akka.http.scaladsl.server.directives.RouteDirectives.complete
import akka.stream.scaladsl.{BroadcastHub, Keep, Source, SourceQueueWithComplete}
import akka.stream.{DelayOverflowStrategy, OverflowStrategy, _}
import akka.util.Timeout
import com.besterdesigns.actors.StreamingEventSourceActor
import com.besterdesigns.models.Json4sFormat
import scala.concurrent.duration._
trait BasicService extends Json4sFormat {
import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
import com.besterdesigns.actors.StartClock
implicit val actorSystem: ActorSystem
implicit val materializer: ActorMaterializer
implicit lazy val ec = actorSystem.dispatcher
//Thanks to https://gist.github.com/claudio-scandura/a8b0011beddb6c8d41f81a551003dc9f for tip on using Source.queue
lazy val (sourceQueue, eventsSource) = Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
.delay(1.seconds, DelayOverflowStrategy.backpressure)
.map(message => ServerSentEvent(message, Some("myEvent")))
.keepAlive(1.second, () => ServerSentEvent.heartbeat)
.toMat(BroadcastHub.sink[ServerSentEvent])(Keep.both)
.run()
lazy val streamingActor = actorSystem.actorOf(StreamingEventSourceActor.props(sourceQueue), name = StreamingEventSourceActor.name)
lazy val myRoute: Route =
path("events") {
concat(
get {
complete {
actorSystem.scheduler.scheduleOnce(2.second) {
streamingActor ! StartClock
}
eventsSource
}
},
put {
// PUT method will post even to connected clients
entity(as[String]) { event =>
complete {
streamingActor ! StreamingEventSourceActor.UpdateDashboard(event)
StatusCodes.OK
}
}
}
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment