Skip to content

Instantly share code, notes, and snippets.

@elyphas
Created November 7, 2022 19:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elyphas/61a2d9ba28758626c0155679bdd47c89 to your computer and use it in GitHub Desktop.
Save elyphas/61a2d9ba28758626c0155679bdd47c89 to your computer and use it in GitHub Desktop.
object FullRequestHandler {
implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global
type Payload = ByteBuffer
val handler = new FullRequestHandler[ByteBuffer, String, String, Option[String]] {
val clients = mutable.HashSet.empty[NotifiableClient[String, Option[String]]]
val events = mutable.ArrayBuffer.empty[String]
override val initialState = Future.successful(None)
override def onRequest(client: NotifiableClient[String, Option[String]], state: Future[Option[String]], path: List[String], payload: ByteBuffer) = {
def deserialize[S: Pickler](ts: ByteBuffer) = Unpickle[S].fromBytes(ts)
def serialize[S: Pickler](ts: S) = Right(Pickle.intoBytes[S](ts))
def value[S: Pickler](ts: S, events: List[String] = Nil) = Future.successful(ReturnValue(serialize(ts), events))
def valueFut[S: Pickler](ts: Future[S], events: List[String] = Nil) = ts.map(ts => ReturnValue(serialize(ts), events))
def error(ts: String, events: List[String] = Nil) = Future.successful(ReturnValue(Left(ts), events))
path match {
case "getAllTypeDocuments" :: Nil => Response(state, value(true))
case "getAllUsers" :: Nil => Response(state, value(true))
}
}
override def onEvent(client: NotifiableClient[String, Option[String]], state: Future[Option[String]], newEvents: List[String]) = {
events ++= newEvents
val downstreamEvents = newEvents.map(event => s"${event}-ok")
downstreamEvents.foreach(println)
Reaction(state, Future.successful(downstreamEvents))
}
override def onClientConnect(client: NotifiableClient[String, Option[String]], state: Future[Option[String]]): Unit = {
client.notify ( _ => Future.successful ( "started" :: Nil ) )
clients += client
()
}
override def onClientDisconnect( client: NotifiableClient[String, Option[String]], state: Future[Option[String]], reason: DisconnectReason): Unit = {
clients -= client
()
}
}
}
object RESTServer extends App {
import FullRequestHandler._
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val executionContext = scala.concurrent.ExecutionContext.Implicits.global
import myceliumHandler.FullRequestHandler._
val builder = implicitly[AkkaMessageBuilder[ByteBuffer]]
val serializer = implicitly[Serializer[ClientMessage[ByteBuffer], ByteBuffer]]
val deserializer = implicitly[Deserializer[ServerMessage[ByteBuffer, String, String], ByteBuffer]]
val config = WebsocketServerConfig(bufferSize = 5, overflowStrategy = OverflowStrategy.fail)
val server = WebsocketServer.withPayload(config, handler)
val binding = Http().newServerAt(interface, port).bindFlow(handleWebSocketMessages(server.flow()))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment