Last active
August 29, 2015 14:10
-
-
Save privateblue/5adae61fee776138613b to your computer and use it in GitHub Desktop.
Every time I run two instances of the code below, one instance connecting to the other, I get a StackOverflowError (see it in a comment below)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.actor.Status | |
import akka.pattern.ask | |
import akka.pattern.AskTimeoutException | |
import akka.util.Timeout | |
import akka.util.ByteString | |
import akka.io.IO | |
import akka.http.Http | |
import akka.http.model._ | |
import HttpMethods._ | |
import akka.stream.scaladsl.Source | |
import akka.stream.scaladsl.Sink | |
import akka.stream.FlowMaterializer | |
import com.typesafe.config.ConfigFactory | |
import scala.concurrent.Future | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration._ | |
import scala.collection.immutable._ | |
/** | |
* Run two instances of this, one bound to port 8081, and request /in on the other one. That will connect it | |
* to the first instance, request the stream at localhost:8081/out, and respond with that stream. | |
*/ | |
object stream { | |
implicit val askTimeout: Timeout = 500.millis | |
implicit val system = ActorSystem("stream", ConfigFactory.parseString("akka.loglevel = OFF")) | |
import system.dispatcher | |
val interface = "localhost" | |
implicit val materializer = FlowMaterializer() | |
def main(args: Array[String]) = { | |
val port = if (args.isEmpty) 8080 else args(0).toInt | |
val binding = IO(Http) ? Http.Bind(interface = interface, port = port) | |
binding.onSuccess { | |
case Http.ServerBinding(_, connectionStream) => | |
Source(connectionStream).foreach { | |
case Http.IncomingConnection(remoteAddress, requestProducer, responseConsumer) => | |
Source(requestProducer).mapAsyncUnordered { | |
case req @ HttpRequest(_, Uri.Path("/out"), _, _, _) => out(req) | |
case req @ HttpRequest(_, Uri.Path("/in"), _, _, _) => in(req) | |
}.to(Sink(responseConsumer)).run() | |
} | |
println(s"Listening at $interface:$port, press RETURN to stop...") | |
Console.readLine | |
system.shutdown | |
} | |
binding.onFailure { | |
case e: AskTimeoutException => | |
println(s"An error occured: ${e.getMessage}") | |
system.shutdown | |
} | |
} | |
/** | |
* Responds with a stream of natural numbers in JSON, as a chunked response. | |
*/ | |
def out(request: HttpRequest): Future[HttpResponse] = Future { | |
HttpResponse(entity = HttpEntity.Chunked( | |
contentType = ContentTypes.`application/json`, | |
chunks = Source(Stream.from(0)).map(n => ByteString(s"""{"n":"$n"}""")) | |
)) | |
} | |
/** | |
* Connects to localhost:8081/out, requests the stream of natural numbers, and | |
* responds with this same stream, as a chunked response. | |
*/ | |
def in(request: HttpRequest): Future[HttpResponse] = { | |
val req = HttpRequest(GET, uri = "/out") | |
val connection = IO(Http) ? Http.Connect("localhost", 8081) | |
val resp = connection flatMap { | |
case Http.OutgoingConnection(_, _, responsePublisher, requestSubscriber) => | |
Source(Iterable((req, ()))).to(Sink(requestSubscriber)).run() | |
val responses = Source(responsePublisher).fold(Seq.empty[HttpResponse])((acc, in) => in._1 +: acc) | |
responses.map(_.headOption.get) | |
case Status.Failure(e: Throwable) => | |
Future.failed(e) | |
} | |
resp map { response => | |
val HttpResponse(_, _, HttpEntity.Chunked(_, chunks), _) = response | |
HttpResponse(entity = HttpEntity.Chunked( | |
contentType = ContentTypes.`application/json`, | |
chunks = chunks | |
)) | |
} | |
} | |
} |
Author
privateblue
commented
Nov 25, 2014
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment