Skip to content

Instantly share code, notes, and snippets.

@schrepfler
Last active September 14, 2016 00:49
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save schrepfler/9333a351738fe59b8085e28614215880 to your computer and use it in GitHub Desktop.
Save schrepfler/9333a351738fe59b8085e28614215880 to your computer and use it in GitHub Desktop.
import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.common.EntityStreamingSupport
import akka.http.scaladsl.model.ws.TextMessage.{Streamed, Strict}
import akka.http.scaladsl.model.ws.{Message, TextMessage, WebSocketRequest}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString
import io.circe.Json
import de.heikoseeberger.akkahttpcirce.CirceSupport._
import akka.http.scaladsl.server.Directives._
import io.circe.generic.auto._
import scala.concurrent.{Future, Promise}
object WsClient extends App {
implicit val system = ActorSystem("ws-client")
implicit val materializer = ActorMaterializer()
val log = system.log
val persistJson = Flow[Json]
val start = ByteString.empty
val sep = ByteString("\n")
val end = ByteString.empty
implicit val jsonStreamingSupport = EntityStreamingSupport.json().withFramingRenderer(Flow[ByteString].intersperse(start, sep, end))
import system.dispatcher
val conversionFlow = Flow[Message]
.collect {
case TextMessage.Strict(msg) => Future.successful(msg)
case TextMessage.Streamed(stream) => stream.runFold("")(_ + _).flatMap(Future.successful(_))
}
.mapAsync(parallelism = 1)(identity)
.map {
case msg: String => TextMessage.Strict(msg)
}
val src: Source[Message, Promise[Option[Message]]] = Source(List(TextMessage("{\"command\" : \"subscribeAll\"}")))
.concatMat(Source.maybe[Message])(Keep.right)
val sink: Sink[Message, NotUsed] = conversionFlow.to(Sink.foreach(s => println(s.text)))
val clientFlow: Flow[Message, Message, NotUsed] = Flow.fromSinkAndSource(sink, src)
Http().singleWebSocketRequest(WebSocketRequest("ws://localhost:8080/updates"), clientFlow)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment