Created
July 28, 2013 07:16
-
-
Save th0br0/6097794 to your computer and use it in GitHub Desktop.
play-s3 streaming upload
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package lib | |
import fly.play.s3._ | |
import java.io.OutputStream | |
import scala.concurrent.Future | |
import scala.concurrent.Await | |
import scala.concurrent.duration.Duration | |
import play.api.http.ContentTypeOf | |
import play.api.libs.concurrent.Execution.Implicits.defaultContext | |
import play.api.libs.concurrent.Akka | |
import play.api.libs.ws.Response | |
import play.api.Logger | |
import collection.mutable.{ ArrayBuffer, ListBuffer } | |
import play.api.Play.current | |
import akka.actor._ | |
case class StreamingBucketFile(name: String, contentType: String, acl: Option[ACL] = None, headers: Option[Map[String, String]] = None) | |
case class UploadPart(data: Array[Byte]) | |
case object UploadCanceled | |
case object UploadComplete | |
class S3UploadActor(s3: S3, bucket: Bucket, bucketFile: StreamingBucketFile, val uploadId: String) extends Actor with ActorLogging{ | |
case class Part(number: Int, etag: String) | |
var partCount = 1 | |
var uploadedParts = ListBuffer[Part]() | |
def receive = { | |
case UploadPart(data) => { | |
val partNumber = partCount | |
partCount += 1 | |
val f = s3.awsWithSigner | |
.url(httpUrl(bucket.name, bucketFile.name)) | |
.withQueryString("uploadId" -> uploadId) | |
.withQueryString("partNumber" -> partNumber.toString) | |
.put(data) | |
S3Response { (status, response) => | |
val etag = response.header("ETag") | |
uploadedParts += Part(partNumber, etag.get) | |
}(Await.result(f, Duration.Inf)) | |
} | |
case UploadComplete => { | |
val parts = uploadedParts.toList.sortBy(_.number).map{part => | |
<Part> | |
<PartNumber>{part.number}</PartNumber> | |
<ETag>{part.etag}</ETag> | |
</Part> | |
} | |
val xml = <CompleteMultipartUpload>{parts}</CompleteMultipartUpload> | |
val f = s3.awsWithSigner | |
.url(httpUrl(bucket.name, bucketFile.name)) | |
.withQueryString("uploadId" -> uploadId) | |
.post(xml) | |
Await.result(f, Duration.Inf) | |
self ! PoisonPill | |
} | |
case UploadCanceled => { | |
val f = s3.awsWithSigner | |
.url(httpUrl(bucket.name, bucketFile.name)) | |
.withQueryString("uploadId" -> uploadId) | |
.delete() | |
Await.result(f, Duration.Inf) | |
self ! PoisonPill | |
} | |
} | |
private def httpUrl(bucketName: String, path: String) = { | |
var protocol = "http" | |
if (s3.https) protocol += "s" | |
// now build all url | |
protocol + "://" + bucketName + "." + s3.host + "/" + path | |
} | |
} | |
class S3UploadStream(s3: S3, bucket: Bucket, bucketFile: StreamingBucketFile) extends OutputStream { | |
val MIN_PART_SIZE = 5 * 1024 * 1024 | |
var ref: ActorRef = null | |
var buffer = ArrayBuffer[Byte]() | |
var closed = false | |
initiateMultipartUpload() map S3Response { (status, response) => | |
val xml = response.xml | |
val uploadId = (xml \ "UploadId").text | |
ref = Akka.system.actorOf(Props(new S3UploadActor(s3, bucket,bucketFile, uploadId))) | |
} | |
override def close() = { flush(); ref ! UploadComplete; closed = true } | |
def cancel() = { ref ! UploadCanceled; closed = true } | |
override def flush() = { | |
if(buffer.length > 0) { | |
val data = buffer.toArray | |
buffer = ArrayBuffer[Byte]() | |
ref ! UploadPart(data) | |
} | |
} | |
override def write(b: Array[Byte]) = { | |
if(closed) | |
throw new RuntimeException("stream is closed") | |
buffer ++= b | |
if(buffer.length >= MIN_PART_SIZE) { | |
flush() | |
} | |
} | |
override def write(b: Int) = {throw new UnsupportedOperationException()} | |
override def write(b: Array[Byte], offset: Int, length: Int) = {throw new UnsupportedOperationException()} | |
private def initiateMultipartUpload(): Future[Response] = { | |
val acl = bucketFile.acl getOrElse PUBLIC_READ | |
implicit val fileContentType = ContentTypeOf[Array[Byte]](Some(bucketFile.contentType)) | |
val headers = (bucketFile.headers getOrElse Map.empty).toList | |
// XXX - create feature request for empty post | |
s3.awsWithSigner | |
.url(httpUrl(bucket.name, bucketFile.name)) | |
.withQueryString(("uploads" -> "")) | |
.withHeaders("X-Amz-acl" -> acl.value :: headers: _*) | |
.post("") | |
} | |
private def httpUrl(bucketName: String, path: String) = { | |
var protocol = "http" | |
if (s3.https) protocol += "s" | |
// now build all url | |
protocol + "://" + bucketName + "." + s3.host + "/" + path | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment