Skip to content

Instantly share code, notes, and snippets.

@rossabaker
Created June 23, 2020 02:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rossabaker/0934df905f2a11d3b658a9df66a0b6eb to your computer and use it in GitHub Desktop.
Save rossabaker/0934df905f2a11d3b658a9df66a0b6eb to your computer and use it in GitHub Desktop.
/*
* Copyright 2013-2020 http4s.org
*
* SPDX-License-Identifier: Apache-2.0
*/
import java.util.Calendar
import cats.effect._
import org.asynchttpclient._
import org.http4s.{HttpRoutes, Method, Request, Status, Uri}
import org.http4s.dsl.io._
import org.http4s.implicits._
import org.http4s.server.blaze._
import fs2.Stream
import fs2.text.utf8DecodeC
import org.http4s.client.Client
import org.http4s.client.asynchttpclient.AsyncHttpClient
import org.http4s.server.Server
import scala.concurrent.duration._
import org.specs2.mutable.Specification
import scala.concurrent.ExecutionContext
import java.util.concurrent.TimeoutException
class Issue3509 extends Specification {
sequential
val singleThreadExecutor = ExecutionContext.fromExecutor((task: Runnable) => task.run())
implicit val contextShift = IO.contextShift(singleThreadExecutor)
implicit val timer = IO.timer(singleThreadExecutor)
val host = "localhost"
val port = 4002
val uri = Uri.unsafeFromString(s"http://$host:$port/test")
val hang = Stream.awakeEvery[IO](10.seconds).take(5)
val hangService = HttpRoutes.of[IO] {
case GET -> Root / "test" => Ok(hang.map(_.toString))
case POST -> Root / "test" => Ok(hang.map(_.toString))
}.orNotFound
def runServer(f: (Server[IO]) => IO[List[String]]): IO[List[String]] = {
BlazeServerBuilder[IO](ExecutionContext.global)
.withIdleTimeout(Duration.Inf)
.bindHttp(port, host)
.withHttpApp(hangService)
.resource.use(f)
}
def runClient(f: (Client[IO]) => IO[List[String]]): IO[List[String]] = {
val defaultConfig = AsyncHttpClient.defaultConfig
val config = new DefaultAsyncHttpClientConfig.Builder()
.setMaxConnectionsPerHost(defaultConfig.getMaxConnectionsPerHost)
.setMaxConnections(defaultConfig.getMaxConnections)
.setRequestTimeout(5 * 1000)
// .setReadTimeout(240*1000) // 4 minutes
.setThreadFactory(defaultConfig.getThreadFactory)
.setCookieStore(defaultConfig.getCookieStore)
.build()
AsyncHttpClient.resource[IO](config).use(f)
}
def runRequest(client: Client[IO]): IO[List[String]] = {
client.stream(Request[IO](Method.GET, uri)).flatMap { response =>
Console.println((Calendar.getInstance().getTime, response))
response.status match {
case Status.Ok => response.body.chunks.through(utf8DecodeC)
case _ => Stream.raiseError[IO](new Exception(response.status.reason))
}
}.compile.toList
}
"should timeout" in {
val result = runServer { _ =>
runClient { client =>
runRequest(client)
}
}
result.unsafeRunSync() must throwA[TimeoutException]
// Expected exception java.lang.Exception to be thrown, but no exception was thrown
}
"should succeed" in {
val result = runServer { _ =>
runClient { client =>
runRequest(client)
}
}
result.unsafeRunSync() must throwA[TimeoutException]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment