Created
July 21, 2014 16:51
-
-
Save mbarton/a3a484ce51d3e4057cd9 to your computer and use it in GitHub Desktop.
Akka HTTP - nested flow (still hangs)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.nio.charset.Charset | |
import akka.actor.{Actor, ActorSystem, Props} | |
import akka.http.Http | |
import akka.http.model.{HttpResponse, HttpRequest} | |
import akka.io.IO | |
import akka.stream.scaladsl.Flow | |
import akka.stream.{FlowMaterializer, MaterializerSettings, Transformer} | |
import akka.util.ByteString | |
import scala.collection.immutable.Seq | |
class TestServer extends Actor { | |
implicit val execContext = context.dispatcher | |
val materializer = FlowMaterializer(MaterializerSettings()) | |
val requestBuffer = new Transformer[HttpRequest, ByteString]() { | |
var buffer = ByteString.newBuilder | |
override def onNext(element: HttpRequest): Seq[ByteString] = ??? | |
} | |
override def preStart() = { | |
IO(Http)(context.system) ! Http.Bind("0.0.0.0", 3030) | |
} | |
def receive = { | |
case Http.ServerBinding(_, incoming) => | |
Flow(incoming).foreach { self ! _ }.consume(materializer) | |
case Http.IncomingConnection(remote, in, out) => | |
Flow(in).map { req => | |
Flow(req.entity.dataBytes(materializer)).transform(new Transformer[ByteString, ByteString] { | |
val buffer = ByteString.newBuilder | |
override def onNext(element: ByteString): Seq[ByteString] = { | |
buffer ++= element | |
Nil | |
} | |
override def onTermination(e: Option[Throwable]): Seq[ByteString] = { | |
Seq(buffer.result()) | |
} | |
}).map { data => | |
val charData = Charset.forName("UTF-8").decode(data.asByteBuffer) | |
println(s"-> $charData") | |
HttpResponse(entity = data) | |
}.produceTo(materializer, out) | |
}.consume(materializer) | |
} | |
} | |
object TestServerApp extends App { | |
val sys = ActorSystem() | |
sys.actorOf(Props[TestServer]) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment