Skip to content

Instantly share code, notes, and snippets.

@davidmweber
Created May 4, 2016 13:11
Show Gist options
  • Save davidmweber/8c8f0a5ce8af61b6cbfc4f3ab1e9b73b to your computer and use it in GitHub Desktop.
Save davidmweber/8c8f0a5ce8af61b6cbfc4f3ab1e9b73b to your computer and use it in GitHub Desktop.
/**
* Copyright © 2016 8eo Inc.
*/
package ws
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding._
import akka.http.scaladsl.model.{HttpRequest, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}
import akka.stream.scaladsl.{Flow, Sink, Source}
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.{Failure, Success}
object TimeoutBug extends App {
implicit val system = ActorSystem("Test")
implicit val materializer = ActorMaterializer()
val host = "localhost"
val port = 10101
val route = get { // Just a slow response
path("slow") {
Thread.sleep(2000)
complete(StatusCodes.OK)
}
}
val decider: Supervision.Decider = {
case error ⇒
println(s"Supervisor received an error: $error") // Never gets here either
Supervision.Stop
}
val reqFlow = Flow[(HttpRequest, Int)]
.via(Http().cachedHostConnectionPool[Int](host, port, ConnectionPoolSettings(system)))
.completionTimeout(1 seconds) // TimeoutExceptions are not caught properly
.withAttributes(ActorAttributes.supervisionStrategy(decider))
val server = Http().bindAndHandle(route, host, port)
Await.result(server, 1 second)
val req = Source.single(Get("/slow") → 0)
.via(reqFlow)
.runWith(Sink.head)
.flatMap {
case (Success(s), _) ⇒ Future.successful(s)
case (Failure(f), _) ⇒ Future.failed(f)
}
req.onComplete {
case Failure(error) ⇒ println(s"Failure was $error") // Never catches the TimeoutException
case Success(r) ⇒ println(s"Success: ${r.status}")
}
Await.result(req, 5 seconds)
server.map(_.unbind().onComplete(_ => system.shutdown()))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment