Skip to content

Instantly share code, notes, and snippets.

@sam
Created March 14, 2016 17:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sam/7731f883a62b329c6592 to your computer and use it in GitHub Desktop.
Save sam/7731f883a62b329c6592 to your computer and use it in GitHub Desktop.
Trying to figure out how to clean up actor-per-request actors with akka-http.
// Just inlining the revelant bits of https://github.com/hseeberger/akka-http-json/blob/master/akka-http-json4s/src/main/scala/de/heikoseeberger/akkahttpjson4s/Json4sSupport.scala#L44
// for the gist.
trait Json4sSupport {
implicit def json4sUnmarshallerConverter[A: Manifest](serialization: Serialization, formats: Formats): FromEntityUnmarshaller[A] =
json4sUnmarshaller(manifest, serialization, formats)
/**
* HTTP entity => `A`
*
* @tparam A type to decode
* @return unmarshaller for `A`
*/
implicit def json4sUnmarshaller[A: Manifest](implicit serialization: Serialization, formats: Formats): FromEntityUnmarshaller[A] =
Unmarshaller
.byteStringUnmarshaller
.forContentTypes(MediaTypes.`application/json`)
.mapWithCharset { (data, charset) =>
val input = if (charset == HttpCharsets.`UTF-8`) data.utf8String else data.decodeString(charset.nioCharset.name)
serialization.read(input)
}
}
// This is basically just the example provided in the Akka documentation AFAICT:
// http://doc.akka.io/docs/akka/2.4.2/scala/http/client-side/request-level.html#Using_the_Future-Based_API_in_Actors
//
// The goal being the same Actor-Instance-per-Request model demonstrated there using preStart
// to fire off the request and receive to collect the HttpResponse.
//
// What the example doesn't show is how to clean up the actor after you're done. Which is
// the part I'm having trouble with. The NotFound case appears to work fine, but if I
// attempt to PoisonPill the Actor after the JObject Unmarshal case, I get an AbruptTerminationException.
class GetDocumentActor(requestor: ActorRef, id: String) extends Actor with Json4sSupport {
// Just a little helper to allow `uri / "segment"`.
implicit class UriBuilder(uri: Uri) {
def /(segment: String): Uri = {
uri.withPath(uri.path + "/" + segment)
}
}
import context.dispatcher
// Setup. You can pretty much ignore this. It's just to connect to a CouchDB database.
val config: Config = context.system.settings.config
val credentials: BasicHttpCredentials = BasicHttpCredentials(config.getString("cloudant.username"),
config.getString("cloudant.password"))
val authorization: Authorization = Authorization(credentials)
val databaseUri: Uri = Uri(s"https://${config.getString("cloudant.hostname")}") / config.getString("cloudant.database")
implicit val materializer = ActorMaterializer()
// JavaTimeSerializers added from json4s PR243 to add Java 8 support.
implicit val formats = DefaultFormats.lossless ++ JavaTimeSerializers.all
// Use json4s Jackson parser support since the json4s native parser is broken in 3.3.x
implicit val serialization = jackson.Serialization
val http = Http(context.system)
// Using preStart to fire off our request.
@throws[Exception](classOf[Exception])
override def preStart(): Unit = {
// Fire off a singleRequest and pipe the response with forwarding for the requestor, which may not be parent.
http.singleRequest(HttpRequest(GET, uri = databaseUri / id, headers = List(authorization))).pipeTo(context.self)(requestor)
}
def receive: Receive = {
// If we get a NotFound, consume the response, forward it to the sender,
// then pipe the Done back to ourself.
case HttpResponse(NotFound, _, entity, _) =>
sender ! NotFound
entity.dataBytes.runWith(Sink.ignore) pipeTo context.self
// If we found the document, deserialize it as a JObject,
// and then send ourself a Done message to trigger an eventual stop.
case HttpResponse(OK, _, entity, _) =>
val self = context.self
Unmarshal(entity.withContentType(`application/json`)).to[JObject]
.pipeTo(sender)
// NOTE!!! Comment out the following line and everything "works".
// Though obviously you'll leak Actors that way as it'll never get
// cleaned up. Attempt to clean it up like this, and you'll get an
// AbruptTerminationException however.
.onComplete { case _ => self ! Done }
// If we get a Done, poison ourself. This appears to work fine for the
// NotFound case, but regularly (always?) causes an AbruptTerminationException
// for the OK case.
case Done => context.self ! PoisonPill
}
}
@sam
Copy link
Author

sam commented Mar 14, 2016

The scalatest that exposes the failure is just:

"users" must {
  "get" in {
    Get("/users/test-user") ~> route ~> check {
      status must be (OK)
      val user = responseAs[User]
      user.email must equal("me@example.com")
      user.hashedPassword must equal("s3cret")
      user.admin must be(false)
    }
  }

  "fail to get" in {
    Get("/users/especiallyFakeUserId") ~> route ~> check {
      status must be (NotFound)
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment