Skip to content

Instantly share code, notes, and snippets.

@jgongo
Created August 6, 2014 15:44
Show Gist options
  • Save jgongo/dbf458d1c91027ea3f3a to your computer and use it in GitHub Desktop.
Save jgongo/dbf458d1c91027ea3f3a to your computer and use it in GitHub Desktop.
This is a ToResponseMarshaller able to stream a BinaryResponse object containing a content type and an Enumerator[Array[Byte]], using Spray chunked response streaming. I needed this to be able to serve files stored in a MongoDB database, using GridFS in a non blocking manner.
case class BinaryResponse(contentType: ContentType, data: Enumerator[Array[Byte]])
trait SharedJSONFormats extends DefaultJsonProtocol with MetaMarshallers with SprayJsonSupport {
implicit def binaryResponseMarshaller(implicit actorRefFactory: ActorRefFactory, executionContext: ExecutionContext, timeout: Timeout) = ToResponseMarshaller[BinaryResponse] { (value, toResponseMarshallingContext) =>
val responseStreamerActor: ActorRef = actorRefFactory.actorOf {
Props {
new Actor with ActorLogging {
object ChunkSent
def receive = {
case data: Array[Byte] => context.become(waitingForResponder(toResponseMarshallingContext.startChunkedMessage(HttpResponse(entity = HttpEntity(value.contentType, data)), ack = Some(ChunkSent)), sender))
case Input.EOF => {
toResponseMarshallingContext.marshalTo(HttpResponse(entity = HttpEntity(value.contentType, HttpData.Empty)))
context.stop(self)
}
case error: Throwable => {
toResponseMarshallingContext.handleError(error)
context.stop(self)
}
}
def waitingForData(responder: ActorRef): Actor.Receive = {
case data: Array[Byte] => {
responder ! MessageChunk(data).withAck(ChunkSent)
context.become(waitingForResponder(responder, sender))
}
case Input.EOF => {
responder ! ChunkedMessageEnd
context.stop(self)
}
case error: Throwable => {
toResponseMarshallingContext.handleError(error)
context.stop(self)
}
}
def waitingForResponder(responder: ActorRef, requestor: ActorRef): Actor.Receive = {
case ChunkSent => {
requestor ! ChunkSent
context.become(waitingForData(responder))
}
case event: Http.ConnectionClosed => {
log.warning("Binary response streaming stopped due to {}", event)
context.stop(self)
}
}
}
}
}
def byteArrayIteratee(responseStreamerActor: ActorRef): Iteratee[Array[Byte], Unit] = {
def continuation(input: Input[Array[Byte]]): Iteratee[Array[Byte], Unit] = input match {
case Input.Empty => Cont[Array[Byte], Unit](continuation)
case Input.El(array) => Iteratee.flatten((responseStreamerActor ? array) map (_ => Cont[Array[Byte], Unit](continuation)))
case Input.EOF => responseStreamerActor ! Input.EOF; Done((), Input.EOF)
}
Cont[Array[Byte], Unit](continuation)
}
value.data |>>> byteArrayIteratee(responseStreamerActor) onFailure {
case NonFatal(error) => responseStreamerActor ! error
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment