Skip to content

Instantly share code, notes, and snippets.

@magnusart
Last active August 29, 2015 14:11
Show Gist options
  • Save magnusart/bffe1a15dab30f794235 to your computer and use it in GitHub Desktop.
Save magnusart/bffe1a15dab30f794235 to your computer and use it in GitHub Desktop.
package proxy
import scala.concurrent.duration._
import scala.concurrent.Future
import akka.io.IO
import akka.http.Http
import akka.http.Http.OutgoingConnection
import akka.stream.scaladsl._
import akka.stream.FlattenStrategy
import akka.stream.scaladsl.FlowGraphImplicits._
import akka.stream.{ MaterializerSettings, FlowMaterializer }
import akka.actor.ActorSystem
import akka.util.Timeout
import scala.util.{ Failure, Success, Try }
import akka.http.model.HttpMethods._
import akka.http.model.StatusCodes._
import akka.http.model._
import com.typesafe.config.{ ConfigFactory, Config }
object ProxyService extends App with Proxy {
val configFile = """
|akka {
| event-handlers = ["akka.event.Logging$DefaultLogger"]
| loglevel = "DEBUG"
| actor {
| provider = "akka.actor.LocalActorRefProvider"
| }
| log-dead-letters-during-shutdown = false
| log-dead-letters = 0
|}
|proxy {
| service {
| name = "ProxySystem"
| http.interface = 127.0.0.1
| http.port = 7080
| }
|}""".stripMargin
val config = ConfigFactory.parseString( configFile )
val interface = config.getString( "proxy.service.http.interface" )
val systemName = config.getString( "proxy.service.name" )
val port = config.getInt( "proxy.service.http.port" ) // Resolve HTTP-port
val systemRef = ActorSystem( systemName, config ) // Start actor system
this.startService // Start application
}
trait Proxy {
def interface: String
def port: Int
def systemRef: ActorSystem
def startService {
implicit val materializer = FlowMaterializer()( systemRef )
implicit val askTimeout: Timeout = 500.millis
val serverBinding = Http( systemRef ).
bind(
interface = interface,
port = port )
val badRequestFlow: Flow[HttpRequest, HttpResponse] =
Flow[HttpRequest].map( _ ⇒ HttpResponse( BadRequest, entity = "Bad Request" ) )
val in = UndefinedSource[HttpRequest]
val out = UndefinedSink[HttpResponse]
val proxyTo = Flow[HttpRequest].map( _ ⇒ HttpRequest( GET, uri = "/?q=akka+streams" ) )
val googleSearch: Flow[HttpRequest, HttpResponse] = Http( systemRef ).outgoingConnection( "www.google.com" ).flow
val upstream: Flow[HttpRequest, HttpResponse] =
Flow[HttpRequest].mapAsync { req ⇒
Source.single( req ).via( googleSearch ).runWith( Sink.head )
}
val routeBranch: Flow[HttpRequest, Branch[HttpRequest]] = Flow[HttpRequest].map {
case req @ HttpRequest( GET, Uri.Path( "/proxy" ), _, _, _ ) ⇒ Branch( true, req )
case other ⇒ Branch( false, other )
}
val partialBranch = PartialFlowGraph {
implicit b ⇒
import FlowGraphImplicits._
val route = new BranchRoute[HttpRequest]
val merge = MergePreferred[HttpResponse]
in ~> routeBranch ~> route.in
route.success ~> proxyTo ~> upstream ~> merge
route.failure ~> badRequestFlow ~> merge.preferred
merge ~> out
}
serverBinding.startHandlingWith( partialBranch.toFlow( in, out ) )
}
}
case class Branch[T]( success: Boolean, message: T )
class BranchRoute[T] extends FlexiRoute[Branch[T]] {
import akka.stream.scaladsl.FlexiRoute._
val success = createOutputPort[T]()
val failure = createOutputPort[T]()
override def createRouteLogic = new RouteLogic[Branch[T]] {
override def outputHandles( outputCount: Int ) = Vector( success, failure )
override def initialState = State[T]( DemandFromAny( success, failure ) ) {
case ( ctx, _, Branch( successState, element ) ) ⇒
if ( successState ) ctx.emit( success, element )
else ctx.emit( failure, element )
SameState
}
}
}
@magnusart
Copy link
Author

Updated för 1.0-M2

@magnusart
Copy link
Author

Updated to handle upstream connection in subflow

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment