Skip to content

Instantly share code, notes, and snippets.

@mbarton
Created July 21, 2014 16:51
Show Gist options
  • Save mbarton/a3a484ce51d3e4057cd9 to your computer and use it in GitHub Desktop.
Save mbarton/a3a484ce51d3e4057cd9 to your computer and use it in GitHub Desktop.
Akka HTTP - nested flow (still hangs)
import java.nio.charset.Charset
import akka.actor.{Actor, ActorSystem, Props}
import akka.http.Http
import akka.http.model.{HttpResponse, HttpRequest}
import akka.io.IO
import akka.stream.scaladsl.Flow
import akka.stream.{FlowMaterializer, MaterializerSettings, Transformer}
import akka.util.ByteString
import scala.collection.immutable.Seq
class TestServer extends Actor {
implicit val execContext = context.dispatcher
val materializer = FlowMaterializer(MaterializerSettings())
val requestBuffer = new Transformer[HttpRequest, ByteString]() {
var buffer = ByteString.newBuilder
override def onNext(element: HttpRequest): Seq[ByteString] = ???
}
override def preStart() = {
IO(Http)(context.system) ! Http.Bind("0.0.0.0", 3030)
}
def receive = {
case Http.ServerBinding(_, incoming) =>
Flow(incoming).foreach { self ! _ }.consume(materializer)
case Http.IncomingConnection(remote, in, out) =>
Flow(in).map { req =>
Flow(req.entity.dataBytes(materializer)).transform(new Transformer[ByteString, ByteString] {
val buffer = ByteString.newBuilder
override def onNext(element: ByteString): Seq[ByteString] = {
buffer ++= element
Nil
}
override def onTermination(e: Option[Throwable]): Seq[ByteString] = {
Seq(buffer.result())
}
}).map { data =>
val charData = Charset.forName("UTF-8").decode(data.asByteBuffer)
println(s"-> $charData")
HttpResponse(entity = data)
}.produceTo(materializer, out)
}.consume(materializer)
}
}
object TestServerApp extends App {
val sys = ActorSystem()
sys.actorOf(Props[TestServer])
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment