Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@rklaehn
Created January 31, 2015 16:29
Show Gist options
  • Star 40 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save rklaehn/3aa3215046df2c0a7795 to your computer and use it in GitHub Desktop.
Save rklaehn/3aa3215046df2c0a7795 to your computer and use it in GitHub Desktop.
Minimal akka http proxy
package akkahttptest
import akka.actor.ActorSystem
import akka.http.Http
import akka.stream.FlowMaterializer
import akka.http.server._
import akka.http.marshalling.PredefinedToResponseMarshallers._
import akka.stream.scaladsl.{HeadSink, Source}
object Proxy extends App {
implicit val system = ActorSystem("Proxy")
implicit val materializer = FlowMaterializer()
implicit val ec = system.dispatcher
val proxy: Route = Route { context =>
val request = context.request
println("Opening connection to " + request.uri.authority.host.address)
val flow = Http(system).outgoingConnection(request.uri.authority.host.address, 80).flow
Source.single(context.request)
.via(flow)
.runWith(HeadSink())
.flatMap(r => context.complete(r))
}
val binding = Http(system).bind(interface = "localhost", port = 1080)
binding.startHandlingWith(proxy)
}
@alari
Copy link

alari commented May 19, 2015

Hi. Is it really enough to pass through everything? Headers, body, chunked response?

@kovacshuni
Copy link

According to akka-http-scala-experimental 1.0-RC2

package sample.stream

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.Route
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Sink, Source}

object Proxy extends App {

  implicit val system = ActorSystem("Proxy")
  implicit val materializer = ActorFlowMaterializer()
  implicit val ec = system.dispatcher

  val proxy = Route { context =>
    val request = context.request
    println("Opening connection to " + request.uri.authority.host.address)
    val flow = Http(system).outgoingConnection(request.uri.authority.host.address(), 8080)
    val handler = Source.single(context.request)
      .map(r => r.withHeaders(RawHeader("x-authenticated", "someone")))
      .via(flow)
      .runWith(Sink.head)
      .flatMap(context.complete(_))
    handler
  }

  val binding = Http(system).bindAndHandle(handler = proxy, interface = "localhost", port = 9000)
}

@zarinfam
Copy link

According to akka-http 2.4.x :

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}

object HttpProxy extends App {

  implicit val system = ActorSystem("Proxy")
  implicit val materializer = ActorMaterializer()
  implicit val ec = system.dispatcher

  val proxy = Route { context =>
    val request = context.request
    println("Opening connection to " + request.uri.authority.host.address)
    val flow = Http(system).outgoingConnection(request.uri.authority.host.address(), 80)
    val handler = Source.single(context.request)
      .via(flow)
      .runWith(Sink.head)
      .flatMap(context.complete(_))
    handler
  }

  val binding = Http(system).bindAndHandle(handler = proxy, interface = "localhost", port = 1080)
}

@DanyMariaLee
Copy link

Hey!
I'm doing exactly what u wrote, but I still get

[akka://Proxy/system/IO-TCP/selectors/$a/2] Could not establish connection to [localhost/127.0.0.1:80] due to java.net.ConnectException: Connection refused

Can u help?

@ellbur
Copy link

ellbur commented Mar 2, 2017

I was proxying a large POST file upload and getting InvalidContentLengthException. I'm not sure why... Anyway, I changed it to

val flow = Http(actorSystem).outgoingConnection(ipAddress, Ports.workerPort)

val req = context.request
val proxyReq = HttpRequest(method = req.method, uri = Uri.from(path = req.uri.path.toString), entity = req.entity)
    
Source.single(proxyReq)
      .via(flow)
      .runWith(Sink.head)
      .flatMap(context.complete(_))

and now it works. Seems to pass chunked just fine. Of course, the above code discards headers, but you could add them back in by searching through req.headers for the ones you need (though I have a suspicion one of the headers might be related to the InvalidContentLengthException).

@adomasGithub
Copy link

adomasGithub commented Mar 3, 2017

I need to unmarshall request body check it's content and make proxy request with the the same request. I got the error 'java.lang.IllegalStateException: Substream Source cannot be materialized more than once' How I can avoid this and reuse request twice?

@fmasion
Copy link

fmasion commented Mar 8, 2017

Hi @adomasGithub,

Request HttpEntity has 3 forms : HttpEntity.Strict, HttpEntity.Default or HttpEntity.Chunked
Only Strict can be consumed multiple times

This is a little tricky because nothing prevents you to build a route using more than ONE Directives that consume this Entity stream.
If you do so you fail with this message without any compilation warning. ( Sad story !! )

  • Everything works fine, you add some kind of logger and everything fails !!!?

  • Worse : you test with chrome everything works fine but it won't with firefox => because each browser/version/os might have a different entity size limit to choose if they send it in Strict mode or Default

Depending on your use case one solution could be to make all your requests Strict like in this exemple code :

trait ToStrict extends BasicDirectives with Directives{
  private val log = Logging(system,getClass)

  def makeStrict(timeout: FiniteDuration): Directive1[HttpRequest] = {
    extractStrict(timeout).flatMap{strictRequest =>
      mapRequest((r:HttpRequest) => strictRequest).tflatMap{u=>
        provide(strictRequest)
      }
    }
  }

  private def extractStrict(timeout: FiniteDuration): Directive1[HttpRequest] = {
    for {
      request <- extractRequest
      strictT <- onComplete(strictify(request,timeout))
    }yield{
      strictT match{
        case Success(strict) => request.copy(entity = strict)
        case Failure(scheisse) =>
          log.error("ACH mein leben ! " + scheisse)
          request
      }
    }
  }

  private def strictify(request : HttpRequest, duration : FiniteDuration): Future[Strict] = {
    request.entity match {
      case e @ HttpEntity.Strict(contentType: ContentType, data: ByteString) =>
        Future.successful(e)
      case e @ HttpEntity.Default(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]) =>
        e.toStrict(duration)
      case e @ HttpEntity.Chunked(contentType: ContentType, chunks: Source[ChunkStreamPart, Any]) =>
        e.toStrict(duration)
    }
  }

This solution isn't perfect :

  • You have some unnecessary overhead
  • You don't start the request immediately by holding the entire Entity in memory before starting to forward it
  • You cannot stream data anymore

AFAIK there's no way to have garanties that you won't use more than one Directive that consumes the Entity.

One more thing : Don't forget to drain your Entities if you don't need them

Fred

@adomasGithub
Copy link

@fmasion very informative answer, it helped, thanks 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment