Skip to content

Instantly share code, notes, and snippets.

@davidmweber
Created January 22, 2016 11:49
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save davidmweber/727dff8db94d76cb5143 to your computer and use it in GitHub Desktop.
Save davidmweber/727dff8db94d76cb5143 to your computer and use it in GitHub Desktop.
Using Ahha http client and pooled requests
package co.horn.streaming
import akka.actor.ActorSystem
import akka.http.ConnectionPoolSettings
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpResponse, HttpRequest}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import scala.collection.immutable.SortedMap
import scala.concurrent.Future
import scala.util.{Failure, Success}
/**
* Rest client dispatcher using an Akka http pooled connection to make the requests
* @param address The target server's address
* @param port The target server's address
* @param poolSettings Settings for this particular connection pool
* @param system An actor system in which to execute the requests
* @param materializer A flow materialiser
*/
case class RestClient(address: String, port: Int, poolSettings: ConnectionPoolSettings)
(implicit val system: ActorSystem, implicit val materializer: ActorMaterializer){
import system.dispatcher
private val pool = Http().cachedHostConnectionPool[Int](address, port, poolSettings)
/**
* Execute a single request using the connection pool.
* @param req An HttpRequest
* @return The response
*/
def exec(req: HttpRequest): Future[HttpResponse] = {
Source.single(req → 1)
.via(pool)
.runWith(Sink.head).flatMap {
case (Success(r: HttpResponse), _) ⇒ Future.successful(r)
case (Failure(f), _) ⇒ Future.failed(f)
}
}
/**
* Take some sequence of requests and pipeline them through the connection pool.
* Return whatever responses we get as a flattened sequence with the answers in the same
* order as the original sequence. Zipping the request and response lists will result
* in tuples of corresponding requests and responses
* @param requests A list of requests that should be simultaneously issued to the pool
* @return The responses in the same order as they were submitted
*/
def execFlatten(requests: Iterable[HttpRequest]): Future[Iterable[HttpResponse]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
}.flatMap(r ⇒ Future.sequence(r.values))
}
/**
* Take some sequence of requests and pipeline them through the connection pool.
* Return whatever responses we get as a sequence of futures that will be ordered
* in such a way that zipping the request and response lists will result
* in tuples of corresponding requests and responses.
* @param requests A list of requests that should be simultaneously issued to the pool
* @return The Future responses in the same order as they were submitted
*/
def exec(requests: Iterable[HttpRequest]): Future[Iterable[Future[HttpResponse]]] = {
Source(requests.zipWithIndex.toMap)
.via(pool)
.runFold(SortedMap[Int, Future[HttpResponse]]()) {
case (m, (Success(r), idx)) ⇒ m + (idx → Future.successful(r))
case (m, (Failure(e), idx)) ⇒ m + (idx → Future.failed(e))
}.map(r ⇒ r.values)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment