Last active
August 29, 2015 14:11
-
-
Save magnusart/bffe1a15dab30f794235 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package proxy | |
import scala.concurrent.duration._ | |
import scala.concurrent.Future | |
import akka.io.IO | |
import akka.http.Http | |
import akka.http.Http.OutgoingConnection | |
import akka.stream.scaladsl._ | |
import akka.stream.FlattenStrategy | |
import akka.stream.scaladsl.FlowGraphImplicits._ | |
import akka.stream.{ MaterializerSettings, FlowMaterializer } | |
import akka.actor.ActorSystem | |
import akka.util.Timeout | |
import scala.util.{ Failure, Success, Try } | |
import akka.http.model.HttpMethods._ | |
import akka.http.model.StatusCodes._ | |
import akka.http.model._ | |
import com.typesafe.config.{ ConfigFactory, Config } | |
object ProxyService extends App with Proxy { | |
val configFile = """ | |
|akka { | |
| event-handlers = ["akka.event.Logging$DefaultLogger"] | |
| loglevel = "DEBUG" | |
| actor { | |
| provider = "akka.actor.LocalActorRefProvider" | |
| } | |
| log-dead-letters-during-shutdown = false | |
| log-dead-letters = 0 | |
|} | |
|proxy { | |
| service { | |
| name = "ProxySystem" | |
| http.interface = 127.0.0.1 | |
| http.port = 7080 | |
| } | |
|}""".stripMargin | |
val config = ConfigFactory.parseString( configFile ) | |
val interface = config.getString( "proxy.service.http.interface" ) | |
val systemName = config.getString( "proxy.service.name" ) | |
val port = config.getInt( "proxy.service.http.port" ) // Resolve HTTP-port | |
val systemRef = ActorSystem( systemName, config ) // Start actor system | |
this.startService // Start application | |
} | |
trait Proxy { | |
def interface: String | |
def port: Int | |
def systemRef: ActorSystem | |
def startService { | |
implicit val materializer = FlowMaterializer()( systemRef ) | |
implicit val askTimeout: Timeout = 500.millis | |
val serverBinding = Http( systemRef ). | |
bind( | |
interface = interface, | |
port = port ) | |
val badRequestFlow: Flow[HttpRequest, HttpResponse] = | |
Flow[HttpRequest].map( _ ⇒ HttpResponse( BadRequest, entity = "Bad Request" ) ) | |
val in = UndefinedSource[HttpRequest] | |
val out = UndefinedSink[HttpResponse] | |
val proxyTo = Flow[HttpRequest].map( _ ⇒ HttpRequest( GET, uri = "/?q=akka+streams" ) ) | |
val googleSearch: Flow[HttpRequest, HttpResponse] = Http( systemRef ).outgoingConnection( "www.google.com" ).flow | |
val upstream: Flow[HttpRequest, HttpResponse] = | |
Flow[HttpRequest].mapAsync { req ⇒ | |
Source.single( req ).via( googleSearch ).runWith( Sink.head ) | |
} | |
val routeBranch: Flow[HttpRequest, Branch[HttpRequest]] = Flow[HttpRequest].map { | |
case req @ HttpRequest( GET, Uri.Path( "/proxy" ), _, _, _ ) ⇒ Branch( true, req ) | |
case other ⇒ Branch( false, other ) | |
} | |
val partialBranch = PartialFlowGraph { | |
implicit b ⇒ | |
import FlowGraphImplicits._ | |
val route = new BranchRoute[HttpRequest] | |
val merge = MergePreferred[HttpResponse] | |
in ~> routeBranch ~> route.in | |
route.success ~> proxyTo ~> upstream ~> merge | |
route.failure ~> badRequestFlow ~> merge.preferred | |
merge ~> out | |
} | |
serverBinding.startHandlingWith( partialBranch.toFlow( in, out ) ) | |
} | |
} | |
case class Branch[T]( success: Boolean, message: T ) | |
class BranchRoute[T] extends FlexiRoute[Branch[T]] { | |
import akka.stream.scaladsl.FlexiRoute._ | |
val success = createOutputPort[T]() | |
val failure = createOutputPort[T]() | |
override def createRouteLogic = new RouteLogic[Branch[T]] { | |
override def outputHandles( outputCount: Int ) = Vector( success, failure ) | |
override def initialState = State[T]( DemandFromAny( success, failure ) ) { | |
case ( ctx, _, Branch( successState, element ) ) ⇒ | |
if ( successState ) ctx.emit( success, element ) | |
else ctx.emit( failure, element ) | |
SameState | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Updated to handle upstream connection in subflow