Skip to content

Instantly share code, notes, and snippets.

@rklaehn
Created January 31, 2015 16:29
Show Gist options
  • Star 40 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save rklaehn/3aa3215046df2c0a7795 to your computer and use it in GitHub Desktop.
Save rklaehn/3aa3215046df2c0a7795 to your computer and use it in GitHub Desktop.
Minimal akka http proxy
package akkahttptest
import akka.actor.ActorSystem
import akka.http.Http
import akka.stream.FlowMaterializer
import akka.http.server._
import akka.http.marshalling.PredefinedToResponseMarshallers._
import akka.stream.scaladsl.{HeadSink, Source}
object Proxy extends App {
implicit val system = ActorSystem("Proxy")
implicit val materializer = FlowMaterializer()
implicit val ec = system.dispatcher
val proxy: Route = Route { context =>
val request = context.request
println("Opening connection to " + request.uri.authority.host.address)
val flow = Http(system).outgoingConnection(request.uri.authority.host.address, 80).flow
Source.single(context.request)
.via(flow)
.runWith(HeadSink())
.flatMap(r => context.complete(r))
}
val binding = Http(system).bind(interface = "localhost", port = 1080)
binding.startHandlingWith(proxy)
}
@fmasion
Copy link

fmasion commented Mar 8, 2017

Hi @adomasGithub,

Request HttpEntity has 3 forms : HttpEntity.Strict, HttpEntity.Default or HttpEntity.Chunked
Only Strict can be consumed multiple times

This is a little tricky because nothing prevents you to build a route using more than ONE Directives that consume this Entity stream.
If you do so you fail with this message without any compilation warning. ( Sad story !! )

  • Everything works fine, you add some kind of logger and everything fails !!!?

  • Worse : you test with chrome everything works fine but it won't with firefox => because each browser/version/os might have a different entity size limit to choose if they send it in Strict mode or Default

Depending on your use case one solution could be to make all your requests Strict like in this exemple code :

trait ToStrict extends BasicDirectives with Directives{
  private val log = Logging(system,getClass)

  def makeStrict(timeout: FiniteDuration): Directive1[HttpRequest] = {
    extractStrict(timeout).flatMap{strictRequest =>
      mapRequest((r:HttpRequest) => strictRequest).tflatMap{u=>
        provide(strictRequest)
      }
    }
  }

  private def extractStrict(timeout: FiniteDuration): Directive1[HttpRequest] = {
    for {
      request <- extractRequest
      strictT <- onComplete(strictify(request,timeout))
    }yield{
      strictT match{
        case Success(strict) => request.copy(entity = strict)
        case Failure(scheisse) =>
          log.error("ACH mein leben ! " + scheisse)
          request
      }
    }
  }

  private def strictify(request : HttpRequest, duration : FiniteDuration): Future[Strict] = {
    request.entity match {
      case e @ HttpEntity.Strict(contentType: ContentType, data: ByteString) =>
        Future.successful(e)
      case e @ HttpEntity.Default(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]) =>
        e.toStrict(duration)
      case e @ HttpEntity.Chunked(contentType: ContentType, chunks: Source[ChunkStreamPart, Any]) =>
        e.toStrict(duration)
    }
  }

This solution isn't perfect :

  • You have some unnecessary overhead
  • You don't start the request immediately by holding the entire Entity in memory before starting to forward it
  • You cannot stream data anymore

AFAIK there's no way to have garanties that you won't use more than one Directive that consumes the Entity.

One more thing : Don't forget to drain your Entities if you don't need them

Fred

@adomasGithub
Copy link

@fmasion very informative answer, it helped, thanks 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment