Skip to content

Instantly share code, notes, and snippets.

@th0br0
Created July 28, 2013 07:16
Show Gist options
  • Save th0br0/6097794 to your computer and use it in GitHub Desktop.
Save th0br0/6097794 to your computer and use it in GitHub Desktop.
play-s3 streaming upload
package lib
import fly.play.s3._
import java.io.OutputStream
import scala.concurrent.Future
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import play.api.http.ContentTypeOf
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.libs.concurrent.Akka
import play.api.libs.ws.Response
import play.api.Logger
import collection.mutable.{ ArrayBuffer, ListBuffer }
import play.api.Play.current
import akka.actor._
case class StreamingBucketFile(name: String, contentType: String, acl: Option[ACL] = None, headers: Option[Map[String, String]] = None)
case class UploadPart(data: Array[Byte])
case object UploadCanceled
case object UploadComplete
class S3UploadActor(s3: S3, bucket: Bucket, bucketFile: StreamingBucketFile, val uploadId: String) extends Actor with ActorLogging{
case class Part(number: Int, etag: String)
var partCount = 1
var uploadedParts = ListBuffer[Part]()
def receive = {
case UploadPart(data) => {
val partNumber = partCount
partCount += 1
val f = s3.awsWithSigner
.url(httpUrl(bucket.name, bucketFile.name))
.withQueryString("uploadId" -> uploadId)
.withQueryString("partNumber" -> partNumber.toString)
.put(data)
S3Response { (status, response) =>
val etag = response.header("ETag")
uploadedParts += Part(partNumber, etag.get)
}(Await.result(f, Duration.Inf))
}
case UploadComplete => {
val parts = uploadedParts.toList.sortBy(_.number).map{part =>
<Part>
<PartNumber>{part.number}</PartNumber>
<ETag>{part.etag}</ETag>
</Part>
}
val xml = <CompleteMultipartUpload>{parts}</CompleteMultipartUpload>
val f = s3.awsWithSigner
.url(httpUrl(bucket.name, bucketFile.name))
.withQueryString("uploadId" -> uploadId)
.post(xml)
Await.result(f, Duration.Inf)
self ! PoisonPill
}
case UploadCanceled => {
val f = s3.awsWithSigner
.url(httpUrl(bucket.name, bucketFile.name))
.withQueryString("uploadId" -> uploadId)
.delete()
Await.result(f, Duration.Inf)
self ! PoisonPill
}
}
private def httpUrl(bucketName: String, path: String) = {
var protocol = "http"
if (s3.https) protocol += "s"
// now build all url
protocol + "://" + bucketName + "." + s3.host + "/" + path
}
}
class S3UploadStream(s3: S3, bucket: Bucket, bucketFile: StreamingBucketFile) extends OutputStream {
val MIN_PART_SIZE = 5 * 1024 * 1024
var ref: ActorRef = null
var buffer = ArrayBuffer[Byte]()
var closed = false
initiateMultipartUpload() map S3Response { (status, response) =>
val xml = response.xml
val uploadId = (xml \ "UploadId").text
ref = Akka.system.actorOf(Props(new S3UploadActor(s3, bucket,bucketFile, uploadId)))
}
override def close() = { flush(); ref ! UploadComplete; closed = true }
def cancel() = { ref ! UploadCanceled; closed = true }
override def flush() = {
if(buffer.length > 0) {
val data = buffer.toArray
buffer = ArrayBuffer[Byte]()
ref ! UploadPart(data)
}
}
override def write(b: Array[Byte]) = {
if(closed)
throw new RuntimeException("stream is closed")
buffer ++= b
if(buffer.length >= MIN_PART_SIZE) {
flush()
}
}
override def write(b: Int) = {throw new UnsupportedOperationException()}
override def write(b: Array[Byte], offset: Int, length: Int) = {throw new UnsupportedOperationException()}
private def initiateMultipartUpload(): Future[Response] = {
val acl = bucketFile.acl getOrElse PUBLIC_READ
implicit val fileContentType = ContentTypeOf[Array[Byte]](Some(bucketFile.contentType))
val headers = (bucketFile.headers getOrElse Map.empty).toList
// XXX - create feature request for empty post
s3.awsWithSigner
.url(httpUrl(bucket.name, bucketFile.name))
.withQueryString(("uploads" -> ""))
.withHeaders("X-Amz-acl" -> acl.value :: headers: _*)
.post("")
}
private def httpUrl(bucketName: String, path: String) = {
var protocol = "http"
if (s3.https) protocol += "s"
// now build all url
protocol + "://" + bucketName + "." + s3.host + "/" + path
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment