Created
January 12, 2018 19:10
-
-
Save DaveDeCaprio/4db9d36a5e907fb5810c00e919347aa3 to your computer and use it in GitHub Desktop.
Lagom websockets client using Akka HTTP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.net.URI | |
import java.util.Locale | |
import akka.NotUsed | |
import akka.actor.ActorSystem | |
import akka.http.scaladsl.Http | |
import akka.http.scaladsl.model.headers.RawHeader | |
import akka.http.scaladsl.model.ws._ | |
import akka.http.scaladsl.model.{HttpHeader, Uri} | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.{Keep, Source} | |
import akka.util.ByteString | |
import com.lightbend.lagom.internal.scaladsl.client.ScaladslWebSocketClient | |
import com.lightbend.lagom.scaladsl.api.transport.ResponseHeader | |
import com.typesafe.scalalogging.StrictLogging | |
import io.netty.handler.codec.http.websocketx._ | |
import play.api.Environment | |
import play.api.http.HeaderNames | |
import play.api.inject.ApplicationLifecycle | |
import scala.collection.mutable.ListBuffer | |
import scala.concurrent.{ExecutionContext, Future, Promise} | |
abstract class LagomClientApplicationAkkaWebsockets(name: String) extends LagomClientApplication(name) { | |
override lazy val scaladslWebSocketClient: ScaladslWebSocketClient = | |
new AkkaHttpWebSocketClient(environment, applicationLifecycle)(executionContext) | |
} | |
/** This client works with websockets of TLS. HTTPS addresses are automatically converted to wss calls. | |
* | |
* It has the major downside that the ResponseHeaders returned are fake. Therefore if you have custom processing of | |
* response headers, they won't work correctly here. | |
* | |
* Also, the flow changes slightly, so that no HTTP calls are done until the returned Source is materialized and run. This is different than the netty client, which eagerly makes the HTTP call and then leaves the websocket open. This is different from the regular Lagom flow and probably worth knowing about. It basically a philsophical difference in how akka-http and netty handle this, and was hard to work around. | |
*/ | |
class AkkaHttpWebSocketClient(environment: Environment, applicationLifecycle: ApplicationLifecycle)(implicit ec: ExecutionContext) extends ScaladslWebSocketClient(environment, applicationLifecycle) with StrictLogging { | |
implicit val system = ActorSystem("AkkaHttpWebSocketClient") | |
implicit val materializer = ActorMaterializer() | |
override def shutdown(): Future[_] = { | |
logger.debug("Shutting down web socket client") | |
system.terminate() | |
super.shutdown() | |
} | |
override def connect(exceptionSerializer: ExceptionSerializer, version: WebSocketVersion, requestHeader: RequestHeader, | |
outgoing: Source[ByteString, NotUsed]): Future[(ResponseHeader, Source[ByteString, NotUsed])] = { | |
logger.debug("Web socket client connecting") | |
val normalized = requestHeaderUri(requestHeader).normalize() | |
val tgt = if (normalized.getPath == null || normalized.getPath.trim().isEmpty) { | |
new URI(normalized.getScheme, normalized.getAuthority, "/", normalized.getQuery, normalized.getFragment) | |
} else normalized | |
val headers = ListBuffer[HttpHeader]()//new DefaultHttpHeaders() | |
messageProtocolToContentTypeHeader(messageHeaderProtocol(requestHeader)).foreach { ct => | |
headers.append(RawHeader(HeaderNames.CONTENT_TYPE, ct)) | |
} | |
val accept = requestHeaderAcceptedResponseProtocols(requestHeader).flatMap { accept => | |
messageProtocolToContentTypeHeader(accept) | |
}.mkString(", ") | |
if (accept.nonEmpty) { | |
headers.append(RawHeader(HeaderNames.ACCEPT, accept)) | |
} | |
messageHeaderHeaders(requestHeader).foreach { | |
case (_, values) => | |
values.foreach { value => | |
headers.append(RawHeader(value._1, value._2)) | |
} | |
} | |
def wsScheme(http: String) = http match { | |
case "http" => "ws" | |
case "https" => "wss" | |
} | |
val uri = Uri(tgt.toASCIIString).copy(scheme = wsScheme(tgt.getScheme)) | |
val request = WebSocketRequest(uri, headers.result()) | |
val webSocketFlow = Http().webSocketClientFlow(request) | |
val fullFlow = outgoing.map { bs => | |
//logger.debug(s"Outgoing message: ${bs.utf8String}") | |
BinaryMessage(bs) | |
}.viaMat(webSocketFlow)(Keep.both).map { bm => | |
bm match { | |
case message: TextMessage.Strict => | |
//logger.debug(s"Processing incoming message: ${message.text}") | |
ByteString(message.text) | |
case message: BinaryMessage.Strict => | |
//logger.debug(s"Processing incoming message: ${message.data.utf8String}") | |
message.data | |
} | |
} | |
val upgradeResponse = Promise[WebSocketUpgradeResponse] | |
val returnedSource = fullFlow.mapMaterializedValue { case (nu, upgradeF) => | |
logger.info(s"Materializing $uri flow") | |
upgradeResponse.completeWith(upgradeF) | |
nu | |
} | |
val respFuture = upgradeResponse.future.flatMap { upgrade => | |
val ct = upgrade.response.headers.filter(_.name()==HeaderNames.CONTENT_TYPE).headOption.map(_.toString()) | |
val rp = messageProtocolFromContentTypeHeader(ct) | |
val statusCode = upgrade.response.status.intValue() | |
upgrade match { | |
case ValidUpgrade(response, chosenSubprotocol) => { | |
logger.debug(s"Valid websocket upgrade for $uri $chosenSubprotocol $response") | |
val headers = response.headers.map { header => | |
header.name() -> header.value() | |
}.groupBy(_._1.toLowerCase(Locale.ENGLISH)).map { | |
case (key, values) => key -> values | |
} | |
Future.successful(newResponseHeader(statusCode, rp, headers)) | |
} | |
case InvalidUpgradeResponse(response, cause) => { | |
logger.error(s"Invalid websocket upgrade $cause") | |
import scala.concurrent.duration._ | |
response.entity.toStrict(1.second).map { body => | |
val ex = exceptionSerializerDeserializeHttpException( | |
exceptionSerializer, | |
statusCode, rp, body.data) | |
throw ex | |
} | |
} | |
} | |
} | |
// There is sort of a fundamental disconnect here between how Lagom works and what Akka-http streaming wants. | |
// Lagom wants a Future[(ResponseHeader, Source[]]. It assumes you make a one time call to HTTP to set up the | |
// WebSocket connection, and then the source produces from the connection. The flow isn't materialized until the | |
// client materializes it. | |
// Akka-HTTP works differently, it doesn't actually make the initial HTTP call to set up the WebSocket until the | |
// flow is materialized. The upshot of this is that you don't have a ResponseHeader until you run the graph, but | |
// the client needs the source to run the graph (which they can't get until the future completes, which must that have a concrete ResponseHeader. Really this should return Future[Future[ResponseHeader], Source] to | |
// play nicely, but that gets confusing. | |
// | |
// The solution here is that we just cheat. We just return a fake Ok response header like everything is great. This | |
// probably will mess some stuff up eventually. | |
Future.successful(ResponseHeader.Ok, returnedSource) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment