Skip to content

Instantly share code, notes, and snippets.

@schrepfler
Last active October 3, 2016 22:36
Show Gist options
  • Save schrepfler/c7b2cb1d3f1f5956bc28489fcb45631d to your computer and use it in GitHub Desktop.
Save schrepfler/c7b2cb1d3f1f5956bc28489fcb45631d to your computer and use it in GitHub Desktop.
akka ws client
object Client extends App with CirceSupport {
implicit val system = ActorSystem("client")
import system.dispatcher
// Create the 'greeter' actor
val greeter = system.actorOf(Props[Greeter], "greeter")
implicit val materializer = ActorMaterializer()
val log = system.log
val persistJson = Flow[Json]
val start = ByteString.empty
val sep = ByteString("\n")
val end = ByteString.empty
implicit val jsonStreamingSupport = EntityStreamingSupport.json()
.withFramingRenderer(Flow[ByteString].intersperse(start, sep, end))
val decoder = Decoder[Json]
val jsonFlow = Flow[Json]
val subscribeCommand: Json = Json.fromString("{\"command\" : \"subscribeAll\"}")
val source: Source[Json, Promise[Option[Json]]] = Source(List(subscribeCommand))
.concatMat(Source.maybe[Json])(Keep.right)
val sink = Sink.foreach(s => greeter ! s)
val clientFlow: Flow[Nothing, Json, NotUsed] = Flow.fromSinkAndSource(sink, source)
Http().singleWebSocketRequest(WebSocketRequest("ws://gtp2betastream.systest.williamhill.plc:8080/eventUpdates"), clientFlow)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment