Skip to content

Instantly share code, notes, and snippets.

@eyalroth
Last active October 17, 2018 14:43
Show Gist options
  • Save eyalroth/9864170d7e2aae06e643192998a60f2c to your computer and use it in GitHub Desktop.
Save eyalroth/9864170d7e2aae06e643192998a60f2c to your computer and use it in GitHub Desktop.
akka-http #2257 - Http Client
import akka.actor.ActorSystem
import akka.http.scaladsl.coding.Gzip
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, ResponseEntity, StatusCode}
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.http.scaladsl.{ConnectionContext, Http}
import akka.stream._
import akka.stream.scaladsl.{Keep, Sink, Source}
import com.typesafe.scalalogging.StrictLogging
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import javax.net.ssl._
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
class HttpClient(val host: String, val port: Int, queueSize: Int, secure: Boolean)(
implicit val materializer: ActorMaterializer)
extends StrictLogging {
/* --- Data Members --- */
implicit val ec = materializer.executionContext
private val cachedConnectionFlow = {
if (secure) {
val sslConfig = AkkaSSLConfig.get(materializer.system).mapSettings { settings =>
settings.withHostnameVerifierClass(classOf[AcceptAllHostnameVerifier].asInstanceOf[Class[HostnameVerifier]])
}
val sslContext = SSLContext.getInstance("TLS")
sslContext.init(Array[KeyManager](), Array(AcceptAllX509TrustManager), null)
val httpsContext = ConnectionContext.https(sslContext, Some(sslConfig))
Http()(materializer.system).cachedHostConnectionPoolHttps[Promise[HttpResponse]](host, port, httpsContext)
} else {
Http()(materializer.system).cachedHostConnectionPool[Promise[HttpResponse]](host, port)
}
}
private val queue =
Source
.queue[(HttpRequest, Promise[HttpResponse])](queueSize, OverflowStrategy.backpressure)
.via(cachedConnectionFlow)
.toMat(Sink.foreach({
case ((Success(resp), p)) => p.success(resp)
case ((Failure(e), p)) => p.failure(e)
}))(Keep.left)
.run()
/* --- Constructors --- */
def this(host: String, port: Int, queueSize: Int, secure: Boolean, system: ActorSystem, dispatcherName: String) = {
this(host, port, queueSize, secure)(
ActorMaterializer(ActorMaterializerSettings(system).withDispatcher(s"akka.dispatchers.$dispatcherName"))(system))
}
/* --- Methods --- */
/* --- Public Methods --- */
val scheme: String = if (secure) "https" else "http"
val baseUri: String = s"$scheme://$host:$port/api/v1"
def queueRequest(request: HttpRequest, ignoredStatusCodes: Set[StatusCode] = Set()): Future[HttpResponse] = {
val responsePromise = Promise[HttpResponse]()
queue
.offer(Gzip.encodeMessage(request) -> responsePromise)
.map {
case QueueOfferResult.Enqueued => responsePromise
case QueueOfferResult.Dropped =>
responsePromise failure new RuntimeException("Queue overflowed. Try again later.")
case QueueOfferResult.QueueClosed =>
responsePromise failure new RuntimeException(
"Queue was closed (pool shut down) while running the request. Try again later.")
case QueueOfferResult.Failure(ex) => responsePromise failure ex
}
.flatMap(_.future)
.andThen {
case Success(response) if response.status.isFailure() && !ignoredStatusCodes.contains(response.status) =>
for (requestBody <- getBody(request.entity, Some(1000)); responseBody <- getBody(response.entity)) {
logger.error(
s"HTTP request to the server returned bad status. Code: ${response.status}. URI: ${request.uri}. Request Body: $requestBody. Response Body: $responseBody")
}
case Failure(e) =>
for (body <- getBody(request.entity, Some(1000))) {
logger.error(s"HTTP request to the server failed. URI: ${request.uri}. Body: $body", e)
}
}
}
def queueAndUnmarshal[A](request: HttpRequest, ignoredStatusCodes: Set[StatusCode] = Set())(
implicit um: Unmarshaller[HttpResponse, A]): Future[A] = {
queueRequest(request, ignoredStatusCodes).flatMap { response =>
Unmarshal(response).to[A].recoverWith {
case NonFatal(e) =>
for (body <- getBody(request.entity, Some(1000))) {
logger.error(s"Failed to unmarshal response. URI: ${request.uri}. Body: $body", e)
}
Future.failed(e)
}
}
}
/* --- Private Methods --- */
private def getBody(entity: ResponseEntity, maxSize: Option[Int] = None): Future[String] = {
entity
.toStrict(300 millis)(materializer)
.map { strict =>
val body = strict.data.utf8String
maxSize
.map { size =>
body.substring(0, Math.min(size, body.size)) match {
case sub if body.length > sub.length => sub + " ... (truncated)"
case _ => body
}
}
.getOrElse(body)
}
.recover {
case e =>
logger.error("Failed to get body", e)
"NaN (failed to get body)"
}
}
}
object HttpClient {
/* --- Constructors --- */
def apply(config: TypesafeConfiguration, system: ActorSystem): HttpClient = {
val host = config.getString("host")
val port = config.getInt("port")
val queueSize = config.getInt("requests-queue-size")
val secure = config.getOptional[Boolean]("secure").getOrElse(true)
val dispatcherName = config.getString("dispatcher")
new HttpClient(host, port, queueSize, secure, system, dispatcherName)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment