Last active
July 14, 2018 01:24
-
-
Save ochrons/d1e35ba003566aaf38c172689e03397e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.HttpMethod | |
import com.amazonaws.services.s3.AmazonS3Client | |
import com.amazonaws.services.s3.model._ | |
... | |
val s3Client = new AmazonS3Client | |
private def getFlow(pathRaw: String, method: HttpMethod) = { | |
// clean the path | |
val path = pathRaw.dropWhile(_ == '/').trim | |
val request = new GeneratePresignedUrlRequest(bucketName, path) | |
val expiration = new java.util.Date() | |
expiration.setTime(expiration.getTime + 1000L * 60 * 60) | |
request.setMethod(method) | |
request.setExpiration(expiration) | |
if (method == HttpMethod.PUT) | |
request.setContentType("application/octet-stream") | |
val url = s3Client.generatePresignedUrl(request) | |
log.debug(s"S3 URL = $url") | |
val flow = if (url.getPort != -1) | |
Http().outgoingConnection(url.getHost, url.getPort) | |
else if (url.getProtocol == "https") | |
Http().outgoingConnectionHttps(url.getHost) | |
else | |
Http().outgoingConnection(url.getHost) | |
(flow, url) | |
} | |
override def sink(path: String, length: Long): Sink[ByteString, Future[Any]] = { | |
val (flow, url) = getFlow(path, HttpMethod.PUT) | |
def wrapRequest(source: Source[ByteString, Any]) = | |
HttpRequest( | |
uri = url.toString, | |
method = HttpMethods.PUT, | |
entity = HttpEntity(ContentTypes.`application/octet-stream`, length, source) | |
) | |
// Akka Stream magic :) | |
Flow[ByteString].prefixAndTail(0) | |
.map(_._2) | |
.map(wrapRequest) | |
.via(flow) | |
.toMat(Sink.head)(Keep.right) | |
.mapMaterializedValue(_.map { resp => | |
log.debug(s"File $path stored to S3") | |
resp | |
}.recover { | |
case ex: Throwable => | |
log.debug(s"Error while storing $path to S3") | |
}) | |
} | |
override def source(path: String): Future[(Source[ByteString, Any], Option[Long])] = { | |
val (flow, url) = getFlow(path, HttpMethod.GET) | |
val req = HttpRequest( | |
uri = url.toString, | |
method = HttpMethods.GET | |
) | |
Source.single(req).via(flow).runWith(Sink.head).map { response => | |
response.status match { | |
case StatusCodes.OK => | |
// get the stream from S3 connection | |
(response.entity.withSizeLimit(Long.MaxValue).dataBytes, None) | |
case StatusCodes.NotFound => | |
throw new Exception(s"$path not found") | |
case error => | |
log.error(error.toString) | |
throw new Exception(s"S3 failed with ${response.status}") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for sharing this. Any chance you could elaborate a bit over this? I'm really intrigued (especially the "0")
Thank you so much!