Skip to content

Instantly share code, notes, and snippets.

@adamw
Last active February 23, 2019 01:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save adamw/e5e391f49153aac37a57fab795896f1a to your computer and use it in GitHub Desktop.
Save adamw/e5e391f49153aac37a57fab795896f1a to your computer and use it in GitHub Desktop.
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.RouteResult.Complete
import akka.http.scaladsl.server.{Directive0, RouteResult}
import akka.stream.scaladsl.Flow
import akka.util.ByteString
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success, Try}
object AroundDirectives {
val timeoutResponse = HttpResponse(StatusCodes.NetworkReadTimeout, entity = "Unable to serve response within time limit.")
def aroundRequest(onRequest: HttpRequest => Try[RouteResult] => Unit)(implicit ec: ExecutionContext): Directive0 = {
extractRequestContext.flatMap { ctx =>
{
val onDone = onRequest(ctx.request)
mapInnerRoute { inner =>
withRequestTimeoutResponse(
_ => {
onDone(Success(Complete(timeoutResponse)))
timeoutResponse
}
) {
inner.andThen { resultFuture =>
resultFuture
.map {
case c @ Complete(response) =>
Complete(response.mapEntity { entity =>
if (entity.isKnownEmpty()) {
onDone(Success(c))
entity
} else {
// On an empty entity, `transformDataBytes` unsets `isKnownEmpty`.
// Call onDone right away, since there's no significant amount of
// data to send, anyway.
entity.transformDataBytes(Flow[ByteString].watchTermination() {
case (m, f) =>
f.map(_ => c).onComplete(onDone)
m
})
}
})
case other =>
onDone(Success(other))
other
}
.andThen { // skip this if you use akka.http.scaladsl.server.handleExceptions, put onDone there
case Failure(ex) =>
onDone(Failure(ex))
}
}
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment