Skip to content

Instantly share code, notes, and snippets.

@ochrons
Last active July 14, 2018 01:24
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ochrons/d1e35ba003566aaf38c172689e03397e to your computer and use it in GitHub Desktop.
Save ochrons/d1e35ba003566aaf38c172689e03397e to your computer and use it in GitHub Desktop.
import com.amazonaws.HttpMethod
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model._
...
val s3Client = new AmazonS3Client
private def getFlow(pathRaw: String, method: HttpMethod) = {
// clean the path
val path = pathRaw.dropWhile(_ == '/').trim
val request = new GeneratePresignedUrlRequest(bucketName, path)
val expiration = new java.util.Date()
expiration.setTime(expiration.getTime + 1000L * 60 * 60)
request.setMethod(method)
request.setExpiration(expiration)
if (method == HttpMethod.PUT)
request.setContentType("application/octet-stream")
val url = s3Client.generatePresignedUrl(request)
log.debug(s"S3 URL = $url")
val flow = if (url.getPort != -1)
Http().outgoingConnection(url.getHost, url.getPort)
else if (url.getProtocol == "https")
Http().outgoingConnectionHttps(url.getHost)
else
Http().outgoingConnection(url.getHost)
(flow, url)
}
override def sink(path: String, length: Long): Sink[ByteString, Future[Any]] = {
val (flow, url) = getFlow(path, HttpMethod.PUT)
def wrapRequest(source: Source[ByteString, Any]) =
HttpRequest(
uri = url.toString,
method = HttpMethods.PUT,
entity = HttpEntity(ContentTypes.`application/octet-stream`, length, source)
)
// Akka Stream magic :)
Flow[ByteString].prefixAndTail(0)
.map(_._2)
.map(wrapRequest)
.via(flow)
.toMat(Sink.head)(Keep.right)
.mapMaterializedValue(_.map { resp =>
log.debug(s"File $path stored to S3")
resp
}.recover {
case ex: Throwable =>
log.debug(s"Error while storing $path to S3")
})
}
override def source(path: String): Future[(Source[ByteString, Any], Option[Long])] = {
val (flow, url) = getFlow(path, HttpMethod.GET)
val req = HttpRequest(
uri = url.toString,
method = HttpMethods.GET
)
Source.single(req).via(flow).runWith(Sink.head).map { response =>
response.status match {
case StatusCodes.OK =>
// get the stream from S3 connection
(response.entity.withSizeLimit(Long.MaxValue).dataBytes, None)
case StatusCodes.NotFound =>
throw new Exception(s"$path not found")
case error =>
log.error(error.toString)
throw new Exception(s"S3 failed with ${response.status}")
}
}
}
@amorroxic
Copy link

Thanks for sharing this. Any chance you could elaborate a bit over this? I'm really intrigued (especially the "0")

// Akka Stream magic :)
Flow[ByteString].prefixAndTail(0)..

Thank you so much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment