-
-
Save rklaehn/3aa3215046df2c0a7795 to your computer and use it in GitHub Desktop.
package akkahttptest | |
import akka.actor.ActorSystem | |
import akka.http.Http | |
import akka.stream.FlowMaterializer | |
import akka.http.server._ | |
import akka.http.marshalling.PredefinedToResponseMarshallers._ | |
import akka.stream.scaladsl.{HeadSink, Source} | |
object Proxy extends App { | |
implicit val system = ActorSystem("Proxy") | |
implicit val materializer = FlowMaterializer() | |
implicit val ec = system.dispatcher | |
val proxy: Route = Route { context => | |
val request = context.request | |
println("Opening connection to " + request.uri.authority.host.address) | |
val flow = Http(system).outgoingConnection(request.uri.authority.host.address, 80).flow | |
Source.single(context.request) | |
.via(flow) | |
.runWith(HeadSink()) | |
.flatMap(r => context.complete(r)) | |
} | |
val binding = Http(system).bind(interface = "localhost", port = 1080) | |
binding.startHandlingWith(proxy) | |
} |
According to akka-http-scala-experimental 1.0-RC2
package sample.stream
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.Route
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.{Sink, Source}
object Proxy extends App {
implicit val system = ActorSystem("Proxy")
implicit val materializer = ActorFlowMaterializer()
implicit val ec = system.dispatcher
val proxy = Route { context =>
val request = context.request
println("Opening connection to " + request.uri.authority.host.address)
val flow = Http(system).outgoingConnection(request.uri.authority.host.address(), 8080)
val handler = Source.single(context.request)
.map(r => r.withHeaders(RawHeader("x-authenticated", "someone")))
.via(flow)
.runWith(Sink.head)
.flatMap(context.complete(_))
handler
}
val binding = Http(system).bindAndHandle(handler = proxy, interface = "localhost", port = 9000)
}
According to akka-http 2.4.x :
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.RawHeader
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
object HttpProxy extends App {
implicit val system = ActorSystem("Proxy")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val proxy = Route { context =>
val request = context.request
println("Opening connection to " + request.uri.authority.host.address)
val flow = Http(system).outgoingConnection(request.uri.authority.host.address(), 80)
val handler = Source.single(context.request)
.via(flow)
.runWith(Sink.head)
.flatMap(context.complete(_))
handler
}
val binding = Http(system).bindAndHandle(handler = proxy, interface = "localhost", port = 1080)
}
Hey!
I'm doing exactly what u wrote, but I still get
[akka://Proxy/system/IO-TCP/selectors/$a/2] Could not establish connection to [localhost/127.0.0.1:80] due to java.net.ConnectException: Connection refused
Can u help?
I was proxying a large POST file upload and getting InvalidContentLengthException
. I'm not sure why... Anyway, I changed it to
val flow = Http(actorSystem).outgoingConnection(ipAddress, Ports.workerPort)
val req = context.request
val proxyReq = HttpRequest(method = req.method, uri = Uri.from(path = req.uri.path.toString), entity = req.entity)
Source.single(proxyReq)
.via(flow)
.runWith(Sink.head)
.flatMap(context.complete(_))
and now it works. Seems to pass chunked just fine. Of course, the above code discards headers, but you could add them back in by searching through req.headers
for the ones you need (though I have a suspicion one of the headers might be related to the InvalidContentLengthException
).
I need to unmarshall request body check it's content and make proxy request with the the same request. I got the error 'java.lang.IllegalStateException: Substream Source cannot be materialized more than once' How I can avoid this and reuse request twice?
Hi @adomasGithub,
Request HttpEntity has 3 forms : HttpEntity.Strict, HttpEntity.Default or HttpEntity.Chunked
Only Strict can be consumed multiple times
This is a little tricky because nothing prevents you to build a route using more than ONE Directives that consume this Entity stream.
If you do so you fail with this message without any compilation warning. ( Sad story !! )
-
Everything works fine, you add some kind of logger and everything fails !!!?
-
Worse : you test with chrome everything works fine but it won't with firefox => because each browser/version/os might have a different entity size limit to choose if they send it in Strict mode or Default
Depending on your use case one solution could be to make all your requests Strict like in this exemple code :
trait ToStrict extends BasicDirectives with Directives{
private val log = Logging(system,getClass)
def makeStrict(timeout: FiniteDuration): Directive1[HttpRequest] = {
extractStrict(timeout).flatMap{strictRequest =>
mapRequest((r:HttpRequest) => strictRequest).tflatMap{u=>
provide(strictRequest)
}
}
}
private def extractStrict(timeout: FiniteDuration): Directive1[HttpRequest] = {
for {
request <- extractRequest
strictT <- onComplete(strictify(request,timeout))
}yield{
strictT match{
case Success(strict) => request.copy(entity = strict)
case Failure(scheisse) =>
log.error("ACH mein leben ! " + scheisse)
request
}
}
}
private def strictify(request : HttpRequest, duration : FiniteDuration): Future[Strict] = {
request.entity match {
case e @ HttpEntity.Strict(contentType: ContentType, data: ByteString) =>
Future.successful(e)
case e @ HttpEntity.Default(contentType: ContentType, contentLength: Long, data: Source[ByteString, Any]) =>
e.toStrict(duration)
case e @ HttpEntity.Chunked(contentType: ContentType, chunks: Source[ChunkStreamPart, Any]) =>
e.toStrict(duration)
}
}
This solution isn't perfect :
- You have some unnecessary overhead
- You don't start the request immediately by holding the entire Entity in memory before starting to forward it
- You cannot stream data anymore
AFAIK there's no way to have garanties that you won't use more than one Directive that consumes the Entity.
One more thing : Don't forget to drain your Entities if you don't need them
Fred
@fmasion very informative answer, it helped, thanks 👍
Hi. Is it really enough to pass through everything? Headers, body, chunked response?