Skip to content

Instantly share code, notes, and snippets.

@Igosuki
Last active December 20, 2021 15:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Igosuki/b57b14094eab733f3cd6f6ca14da2dc5 to your computer and use it in GitHub Desktop.
Save Igosuki/b57b14094eab733f3cd6f6ca14da2dc5 to your computer and use it in GitHub Desktop.
http4s fs2 websocket
abstract class UserWSEndpoint[F[+ _]: Timer: Concurrent: ContextShift](
broadcastTopic: Topic[F, PushNotification])(implicit F: ConcurrentEffect[F])
extends Http4sDsl[F]
with Loggable {
def broadcast(e: PushNotification) = broadcastTopic.publish1(e)
def toText[A: Encoder](a: A)(implicit e: Encoder[A], wse: Encoder[WSMessage]) =
Text(wse(WSMessage("", "", e(a).some, "", "".some)).asString.get)
def mergedStreams(m: mutable.Map[String, Stream[F, WebSocketFrame]]) =
m.toList match {
case head :: tail => tail.foldLeft(head._2)((s1, s2) => s1.merge(s2._2))
case Nil => Stream.empty
}
val emptyJson = Json.obj()
def textReply(wsm: WSMessage, msg: Json) =
Text(wsMsgEncoder(WSMessage(wsm.topic, REPLY, Some(msg), wsm.ref, wsm.joinRef)).toString)
def startBroadcastSubscriber(pushTopic: Topic[F, WebSocketFrame.Text],
registry: mutable.HashMap[String, (String, String)]) =
broadcastTopic
.subscribe(10)
.filter { p => registry.contains(p.topic) }
.map { p =>
Text(wsMsgEncoder(WSMessage(p.topic, p.event, Some(p.j), "", Some(""))).toString)
}
.through(pushTopic.publish)
val routes: HttpRoutes[F] = HttpRoutes.of[F] {
case req @ POST -> Root / "broadcast" =>
req.decode[PushNotification] { p =>
broadcast(p).flatMap(_ => Ok("Kek"))
}.handleErrorWith { // We can handle errors using effect methods
case e: Exception => BadRequest("Bad message")
}
case GET -> Root / "users" / "websocket" =>
// Topic, (Ref, JoinRef)
val subscriptions = mutable.HashMap[String, (String, String)]()
val addSubscription = (t: Topic[F, WebSocketFrame.Text], wsm: WSMessage) => {
subscriptions.put(wsm.topic, (wsm.ref, wsm.joinRef.getOrElse("")))
logger.info(s"Joined ${wsm.topic}")
t.publish1(textReply(wsm, PhxReply(OK, emptyJson).asJson))
}
val clientTopic =
Topic(Text(wsMsgEncoder(WSMessage("opened", OK, Some(Json.obj()), "0", Some("0"))).toString()))
val wsE = for {
t <- clientTopic
_ <- startBroadcastSubscriber(t, subscriptions)
// .runAsync(e => IO(logger.info(s"Exception occurred while draining publisher topic $e"))).unsafeRunSync
joinTopic = (wsm: WSMessage) => addSubscription(t, wsm)
leaveTopic = (topic: String) => {
subscriptions.remove _
logger.info(s"Left $topic")
}
fromClient = (s: Stream[F, WebSocketFrame]) =>
s.evalMap { (ws: WebSocketFrame) =>
ws match {
case Text(txt, _) =>
decode[WSMessage](txt).fold(
e => F.delay(logger.info(s"Parse error $e")),
m =>
m.event match {
case "join" => joinTopic(m)
case "leave" => F.delay(leaveTopic(m.topic))
case "heartbeat" if m.topic == "main" =>
t.publish1(textReply(m, PhxReply(OK, emptyJson).asJson))
case s => F.delay(logger.info(s))
}
)
case f => F.delay(logger.info(s"Unknown type: $f"))
}
}
} yield WebSocketBuilder[F].build(t.subscribe(10), fromClient)
wsE.flatten
case GET -> Root / "echosocket" =>
val queue = fs2.concurrent.Queue.unbounded[F, WebSocketFrame]
val echoReply: Pipe[F, WebSocketFrame, WebSocketFrame] = _.collect {
case Text(msg, _) => Text("You sent the server: " + msg)
case _ => Text("Something new")
}
queue.flatMap { q =>
val d = q.dequeue.through(echoReply)
val e = q.enqueue
WebSocketBuilder[F].build(d, e)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment