Skip to content

Instantly share code, notes, and snippets.

@DaveDeCaprio
Created January 12, 2018 19:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save DaveDeCaprio/4db9d36a5e907fb5810c00e919347aa3 to your computer and use it in GitHub Desktop.
Save DaveDeCaprio/4db9d36a5e907fb5810c00e919347aa3 to your computer and use it in GitHub Desktop.
Lagom websockets client using Akka HTTP
import java.net.URI
import java.util.Locale
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.model.ws._
import akka.http.scaladsl.model.{HttpHeader, Uri}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Source}
import akka.util.ByteString
import com.lightbend.lagom.internal.scaladsl.client.ScaladslWebSocketClient
import com.lightbend.lagom.scaladsl.api.transport.ResponseHeader
import com.typesafe.scalalogging.StrictLogging
import io.netty.handler.codec.http.websocketx._
import play.api.Environment
import play.api.http.HeaderNames
import play.api.inject.ApplicationLifecycle
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ExecutionContext, Future, Promise}
abstract class LagomClientApplicationAkkaWebsockets(name: String) extends LagomClientApplication(name) {
override lazy val scaladslWebSocketClient: ScaladslWebSocketClient =
new AkkaHttpWebSocketClient(environment, applicationLifecycle)(executionContext)
}
/** This client works with websockets of TLS. HTTPS addresses are automatically converted to wss calls.
*
* It has the major downside that the ResponseHeaders returned are fake. Therefore if you have custom processing of
* response headers, they won't work correctly here.
*
* Also, the flow changes slightly, so that no HTTP calls are done until the returned Source is materialized and run. This is different than the netty client, which eagerly makes the HTTP call and then leaves the websocket open. This is different from the regular Lagom flow and probably worth knowing about. It basically a philsophical difference in how akka-http and netty handle this, and was hard to work around.
*/
class AkkaHttpWebSocketClient(environment: Environment, applicationLifecycle: ApplicationLifecycle)(implicit ec: ExecutionContext) extends ScaladslWebSocketClient(environment, applicationLifecycle) with StrictLogging {
implicit val system = ActorSystem("AkkaHttpWebSocketClient")
implicit val materializer = ActorMaterializer()
override def shutdown(): Future[_] = {
logger.debug("Shutting down web socket client")
system.terminate()
super.shutdown()
}
override def connect(exceptionSerializer: ExceptionSerializer, version: WebSocketVersion, requestHeader: RequestHeader,
outgoing: Source[ByteString, NotUsed]): Future[(ResponseHeader, Source[ByteString, NotUsed])] = {
logger.debug("Web socket client connecting")
val normalized = requestHeaderUri(requestHeader).normalize()
val tgt = if (normalized.getPath == null || normalized.getPath.trim().isEmpty) {
new URI(normalized.getScheme, normalized.getAuthority, "/", normalized.getQuery, normalized.getFragment)
} else normalized
val headers = ListBuffer[HttpHeader]()//new DefaultHttpHeaders()
messageProtocolToContentTypeHeader(messageHeaderProtocol(requestHeader)).foreach { ct =>
headers.append(RawHeader(HeaderNames.CONTENT_TYPE, ct))
}
val accept = requestHeaderAcceptedResponseProtocols(requestHeader).flatMap { accept =>
messageProtocolToContentTypeHeader(accept)
}.mkString(", ")
if (accept.nonEmpty) {
headers.append(RawHeader(HeaderNames.ACCEPT, accept))
}
messageHeaderHeaders(requestHeader).foreach {
case (_, values) =>
values.foreach { value =>
headers.append(RawHeader(value._1, value._2))
}
}
def wsScheme(http: String) = http match {
case "http" => "ws"
case "https" => "wss"
}
val uri = Uri(tgt.toASCIIString).copy(scheme = wsScheme(tgt.getScheme))
val request = WebSocketRequest(uri, headers.result())
val webSocketFlow = Http().webSocketClientFlow(request)
val fullFlow = outgoing.map { bs =>
//logger.debug(s"Outgoing message: ${bs.utf8String}")
BinaryMessage(bs)
}.viaMat(webSocketFlow)(Keep.both).map { bm =>
bm match {
case message: TextMessage.Strict =>
//logger.debug(s"Processing incoming message: ${message.text}")
ByteString(message.text)
case message: BinaryMessage.Strict =>
//logger.debug(s"Processing incoming message: ${message.data.utf8String}")
message.data
}
}
val upgradeResponse = Promise[WebSocketUpgradeResponse]
val returnedSource = fullFlow.mapMaterializedValue { case (nu, upgradeF) =>
logger.info(s"Materializing $uri flow")
upgradeResponse.completeWith(upgradeF)
nu
}
val respFuture = upgradeResponse.future.flatMap { upgrade =>
val ct = upgrade.response.headers.filter(_.name()==HeaderNames.CONTENT_TYPE).headOption.map(_.toString())
val rp = messageProtocolFromContentTypeHeader(ct)
val statusCode = upgrade.response.status.intValue()
upgrade match {
case ValidUpgrade(response, chosenSubprotocol) => {
logger.debug(s"Valid websocket upgrade for $uri $chosenSubprotocol $response")
val headers = response.headers.map { header =>
header.name() -> header.value()
}.groupBy(_._1.toLowerCase(Locale.ENGLISH)).map {
case (key, values) => key -> values
}
Future.successful(newResponseHeader(statusCode, rp, headers))
}
case InvalidUpgradeResponse(response, cause) => {
logger.error(s"Invalid websocket upgrade $cause")
import scala.concurrent.duration._
response.entity.toStrict(1.second).map { body =>
val ex = exceptionSerializerDeserializeHttpException(
exceptionSerializer,
statusCode, rp, body.data)
throw ex
}
}
}
}
// There is sort of a fundamental disconnect here between how Lagom works and what Akka-http streaming wants.
// Lagom wants a Future[(ResponseHeader, Source[]]. It assumes you make a one time call to HTTP to set up the
// WebSocket connection, and then the source produces from the connection. The flow isn't materialized until the
// client materializes it.
// Akka-HTTP works differently, it doesn't actually make the initial HTTP call to set up the WebSocket until the
// flow is materialized. The upshot of this is that you don't have a ResponseHeader until you run the graph, but
// the client needs the source to run the graph (which they can't get until the future completes, which must that have a concrete ResponseHeader. Really this should return Future[Future[ResponseHeader], Source] to
// play nicely, but that gets confusing.
//
// The solution here is that we just cheat. We just return a fake Ok response header like everything is great. This
// probably will mess some stuff up eventually.
Future.successful(ResponseHeader.Ok, returnedSource)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment