Skip to content

Instantly share code, notes, and snippets.

@privateblue
Last active August 29, 2015 14:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save privateblue/5adae61fee776138613b to your computer and use it in GitHub Desktop.
Save privateblue/5adae61fee776138613b to your computer and use it in GitHub Desktop.
Every time I run two instances of the code below, one instance connecting to the other, I get a StackOverflowError (see it in a comment below)
import akka.actor.ActorSystem
import akka.actor.Status
import akka.pattern.ask
import akka.pattern.AskTimeoutException
import akka.util.Timeout
import akka.util.ByteString
import akka.io.IO
import akka.http.Http
import akka.http.model._
import HttpMethods._
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink
import akka.stream.FlowMaterializer
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.collection.immutable._
/**
* Run two instances of this, one bound to port 8081, and request /in on the other one. That will connect it
* to the first instance, request the stream at localhost:8081/out, and respond with that stream.
*/
object stream {
implicit val askTimeout: Timeout = 500.millis
implicit val system = ActorSystem("stream", ConfigFactory.parseString("akka.loglevel = OFF"))
import system.dispatcher
val interface = "localhost"
implicit val materializer = FlowMaterializer()
def main(args: Array[String]) = {
val port = if (args.isEmpty) 8080 else args(0).toInt
val binding = IO(Http) ? Http.Bind(interface = interface, port = port)
binding.onSuccess {
case Http.ServerBinding(_, connectionStream) =>
Source(connectionStream).foreach {
case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) =>
Source(requestProducer).mapAsyncUnordered {
case req @ HttpRequest(_, Uri.Path("/out"), _, _, _) => out(req)
case req @ HttpRequest(_, Uri.Path("/in"), _, _, _) => in(req)
}.to(Sink(responseConsumer)).run()
}
println(s"Listening at $interface:$port, press RETURN to stop...")
Console.readLine
system.shutdown
}
binding.onFailure {
case e: AskTimeoutException =>
println(s"An error occured: ${e.getMessage}")
system.shutdown
}
}
/**
* Responds with a stream of natural numbers in JSON, as a chunked response.
*/
def out(request: HttpRequest): Future[HttpResponse] = Future {
HttpResponse(entity = HttpEntity.Chunked(
contentType = ContentTypes.`application/json`,
chunks = Source(Stream.from(0)).map(n => ByteString(s"""{"n":"$n"}"""))
))
}
/**
* Connects to localhost:8081/out, requests the stream of natural numbers, and
* responds with this same stream, as a chunked response.
*/
def in(request: HttpRequest): Future[HttpResponse] = {
val req = HttpRequest(GET, uri = "/out")
val connection = IO(Http) ? Http.Connect("localhost", 8081)
val resp = connection flatMap {
case Http.OutgoingConnection(_, _, responsePublisher, requestSubscriber) =>
Source(Iterable((req, ()))).to(Sink(requestSubscriber)).run()
val responses = Source(responsePublisher).fold(Seq.empty[HttpResponse])((acc, in) => in._1 +: acc)
responses.map(_.headOption.get)
case Status.Failure(e: Throwable) =>
Future.failed(e)
}
resp map { response =>
val HttpResponse(_, _, HttpEntity.Chunked(_, chunks), _) = response
HttpResponse(entity = HttpEntity.Chunked(
contentType = ContentTypes.`application/json`,
chunks = chunks
))
}
}
}
@privateblue
Copy link
Author

java.lang.StackOverflowError
    at akka.util.ByteString$ByteString1$.apply(ByteString.scala:123)
    at akka.util.ByteString$ByteString1C.toByteString1(ByteString.scala:102)
    at akka.util.ByteString$ByteString1C.slice(ByteString.scala:117)
    at akka.http.engine.parsing.HttpMessageParser.result$1(HttpMessageParser.scala:159)
    at akka.http.engine.parsing.HttpMessageParser.parseChunkBody$1(HttpMessageParser.scala:163)
    at akka.http.engine.parsing.HttpMessageParser.parseSize$1(HttpMessageParser.scala:184)
    at akka.http.engine.parsing.HttpMessageParser.parseChunk(HttpMessageParser.scala:189)
    at akka.http.engine.parsing.HttpMessageParser.result$1(HttpMessageParser.scala:160)
    at akka.http.engine.parsing.HttpMessageParser.parseChunkBody$1(HttpMessageParser.scala:163)
    at akka.http.engine.parsing.HttpMessageParser.parseSize$1(HttpMessageParser.scala:184)
    at akka.http.engine.parsing.HttpMessageParser.parseChunk(HttpMessageParser.scala:189)
    at akka.http.engine.parsing.HttpMessageParser.result$1(HttpMessageParser.scala:160)
    at akka.http.engine.parsing.HttpMessageParser.parseChunkBody$1(HttpMessageParser.scala:163)
    at akka.http.engine.parsing.HttpMessageParser.parseSize$1(HttpMessageParser.scala:184)
    at akka.http.engine.parsing.HttpMessageParser.parseChunk(HttpMessageParser.scala:189)
    at akka.http.engine.parsing.HttpMessageParser.result$1(HttpMessageParser.scala:160)
    at akka.http.engine.parsing.HttpMessageParser.parseChunkBody$1(HttpMessageParser.scala:163)
    at akka.http.engine.parsing.HttpMessageParser.parseSize$1(HttpMessageParser.scala:184)
    at akka.http.engine.parsing.HttpMessageParser.parseChunk(HttpMessageParser.scala:189)
    at akka.http.engine.parsing.HttpMessageParser.result$1(HttpMessageParser.scala:160)
    at akka.http.engine.parsing.HttpMessageParser.parseChunkBody$1(HttpMessageParser.scala:163)
    at akka.http.engine.parsing.HttpMessageParser.parseSize$1(HttpMessageParser.scala:184)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment