Created
August 6, 2014 15:44
-
-
Save jgongo/dbf458d1c91027ea3f3a to your computer and use it in GitHub Desktop.
This is a ToResponseMarshaller able to stream a BinaryResponse object containing a content type and an Enumerator[Array[Byte]], using Spray chunked response streaming. I needed this to be able to serve files stored in a MongoDB database, using GridFS in a non blocking manner.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class BinaryResponse(contentType: ContentType, data: Enumerator[Array[Byte]]) | |
trait SharedJSONFormats extends DefaultJsonProtocol with MetaMarshallers with SprayJsonSupport { | |
implicit def binaryResponseMarshaller(implicit actorRefFactory: ActorRefFactory, executionContext: ExecutionContext, timeout: Timeout) = ToResponseMarshaller[BinaryResponse] { (value, toResponseMarshallingContext) => | |
val responseStreamerActor: ActorRef = actorRefFactory.actorOf { | |
Props { | |
new Actor with ActorLogging { | |
object ChunkSent | |
def receive = { | |
case data: Array[Byte] => context.become(waitingForResponder(toResponseMarshallingContext.startChunkedMessage(HttpResponse(entity = HttpEntity(value.contentType, data)), ack = Some(ChunkSent)), sender)) | |
case Input.EOF => { | |
toResponseMarshallingContext.marshalTo(HttpResponse(entity = HttpEntity(value.contentType, HttpData.Empty))) | |
context.stop(self) | |
} | |
case error: Throwable => { | |
toResponseMarshallingContext.handleError(error) | |
context.stop(self) | |
} | |
} | |
def waitingForData(responder: ActorRef): Actor.Receive = { | |
case data: Array[Byte] => { | |
responder ! MessageChunk(data).withAck(ChunkSent) | |
context.become(waitingForResponder(responder, sender)) | |
} | |
case Input.EOF => { | |
responder ! ChunkedMessageEnd | |
context.stop(self) | |
} | |
case error: Throwable => { | |
toResponseMarshallingContext.handleError(error) | |
context.stop(self) | |
} | |
} | |
def waitingForResponder(responder: ActorRef, requestor: ActorRef): Actor.Receive = { | |
case ChunkSent => { | |
requestor ! ChunkSent | |
context.become(waitingForData(responder)) | |
} | |
case event: Http.ConnectionClosed => { | |
log.warning("Binary response streaming stopped due to {}", event) | |
context.stop(self) | |
} | |
} | |
} | |
} | |
} | |
def byteArrayIteratee(responseStreamerActor: ActorRef): Iteratee[Array[Byte], Unit] = { | |
def continuation(input: Input[Array[Byte]]): Iteratee[Array[Byte], Unit] = input match { | |
case Input.Empty => Cont[Array[Byte], Unit](continuation) | |
case Input.El(array) => Iteratee.flatten((responseStreamerActor ? array) map (_ => Cont[Array[Byte], Unit](continuation))) | |
case Input.EOF => responseStreamerActor ! Input.EOF; Done((), Input.EOF) | |
} | |
Cont[Array[Byte], Unit](continuation) | |
} | |
value.data |>>> byteArrayIteratee(responseStreamerActor) onFailure { | |
case NonFatal(error) => responseStreamerActor ! error | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment