Skip to content

Instantly share code, notes, and snippets.

@kareblak
Created May 26, 2017 10:23
Show Gist options
  • Save kareblak/ae8f2fe543da625caf4e53fcb5aed1fc to your computer and use it in GitHub Desktop.
Save kareblak/ae8f2fe543da625caf4e53fcb5aed1fc to your computer and use it in GitHub Desktop.
import fs2._
import org.http4s._
import org.http4s.dsl._
import org.http4s.server.blaze._
import org.http4s.server.websocket._
import org.http4s.util._
import org.http4s.websocket.WebsocketBits
import scala.concurrent.duration._
import fs2.async.mutable.Topic
object Main2 extends StreamApp {
def publish(topic: Topic[Task, WebsocketBits.Text]): Stream[Task, Unit] = {
val stream = time.awakeEvery[Task](1.second).map(n => WebsocketBits.Text("foo" + n.toSeconds))
stream.to(topic.publish)
}
implicit val strategy: Strategy = Strategy.fromFixedDaemonPool(1)
implicit val scheduler: Scheduler = Scheduler.fromFixedDaemonPool(1)
def service(topic: Topic[Task, WebsocketBits.Text]) = HttpService {
case req @ GET -> Root =>
val stream = topic.subscribe(1)
WS(stream, _ => Stream.empty)
case req @ GET -> Root / "ok" =>
Task.now(Response(Status.Ok))
}
def stream(args: List[String]): Stream[Task, Unit] = {
Stream.eval(async.topic[Task, WebsocketBits.Text](WebsocketBits.Text(""))).flatMap { topic =>
val blaze = BlazeBuilder
.bindLocal(8080)
.withWebSockets(true)
.mountService(service(topic), "/")
.serve
publish(topic).merge(blaze)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment