Created
August 6, 2019 14:20
-
-
Save PiotrJander/d3200aa41c9a54e52866b43d8308da76 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.enso | |
import akka.actor.ActorSystem | |
import akka.{ Done, NotUsed } | |
import akka.http.scaladsl.Http | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl._ | |
import akka.http.scaladsl.model._ | |
import akka.http.scaladsl.model.ws._ | |
import scala.concurrent.duration._ | |
import scala.concurrent.Future | |
object SingleWebSocketRequest { | |
def main(args: Array[String]): Unit = { | |
implicit val system: ActorSystem = ActorSystem() | |
implicit val materializer: ActorMaterializer = ActorMaterializer() | |
import system.dispatcher | |
// // start | |
// val source = Source(1 to 100) | |
// | |
// val done: Future[Done] = source.throttle(1, 1.second).runForeach(i => println(i))(materializer) | |
// | |
// done.onComplete(_ => system.terminate()) | |
// // end | |
var send: List[Long] = List() | |
var receive: List[Long] = List() | |
// print each incoming strict text message | |
val printSink: Sink[Message, Future[Done]] = | |
Sink.foreach { _ => | |
receive = System.currentTimeMillis() :: receive | |
} | |
val helloSource: Source[Message, NotUsed] = | |
Source(Stream.fill(100){ | |
send = System.currentTimeMillis() :: send | |
TextMessage("""{"method":"ping","responseSize":4}""") | |
}) | |
// the Future[Done] is the materialized value of Sink.foreach | |
// and it is completed when the stream completes | |
val flow: Flow[Message, Message, Future[Done]] = | |
Flow.fromSinkAndSourceMat(printSink, helloSource)(Keep.left) | |
// upgradeResponse is a Future[WebSocketUpgradeResponse] that | |
// completes or fails when the connection succeeds or fails | |
// and closed is a Future[Done] representing the stream completion from above | |
val (upgradeResponse, closed) = | |
Http().singleWebSocketRequest(WebSocketRequest("ws://localhost:8080"), flow) | |
val connected = upgradeResponse.map { upgrade => | |
// just like a regular http request we can access response status which is available via upgrade.response.status | |
// status code 101 (Switching Protocols) indicates that server support WebSockets | |
if (upgrade.response.status == StatusCodes.SwitchingProtocols) { | |
Done | |
} else { | |
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}") | |
} | |
} | |
// in a real application you would not side effect here | |
// and handle errors more carefully | |
connected.onComplete(println) | |
closed.foreach {_ => | |
println("closed") | |
println("same length?", send.length - receive.length) | |
println("diffs", (receive, send).zipped map (_ - _)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment