Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Created May 23, 2019 17:31
Show Gist options
  • Save Daenyth/4ecf2cbd7fc885182148f60484df2fe1 to your computer and use it in GitHub Desktop.
Save Daenyth/4ecf2cbd7fc885182148f60484df2fe1 to your computer and use it in GitHub Desktop.
AlpakkaS3Put for fs2
import akka.http.scaladsl.model.{ContentType, ContentTypes}
import akka.stream.Materializer
import akka.stream.alpakka.s3.scaladsl.{S3 => AlpakkaS3}
import akka.stream.alpakka.s3.{MetaHeaders, MultipartUploadResult}
import akka.util.ByteString
import cats.effect.{ConcurrentEffect, ContextShift}
import fs2.{Pipe, Stream}
import org.http4s.Uri
import org.joda.time.DateTime
import Compat._
import scala.concurrent.{ExecutionContext, Future}
object AlpakkaS3Put {
def fromConfig[F[_]: ConcurrentEffect: ContextShift](
config: S3Config
)(
implicit m: Materializer,
ec: ExecutionContext
): AlpakkaS3Put[F] =
new AlpakkaS3Put[F](config)
}
class AlpakkaS3Put[F[_]: ContextShift](
config: S3Config
)(
implicit m: Materializer,
ec: ExecutionContext,
F: ConcurrentEffect[F]
) extends S3Put[F] {
override type PutResult = MultipartUploadResult
def upload(
key: S3Key,
contentType: ContentType = ContentTypes.`text/plain(UTF-8)`,
contentEncoding: ContentEncoding = ContentEncoding.identity,
headers: MetaHeaders = MetaHeaders(Map.empty)
): Pipe[F, Byte, MultipartUploadResult] = { in: Stream[F, Byte] =>
in.through(byteToByteString)
.through(akkaSink(key, contentType, contentEncoding, headers).toPipeMat[F])
}
private def akkaSink(key: S3Key,
contentType: ContentType,
contentEncoding: ContentEncoding,
headers: MetaHeaders)
: AkkaSink[ByteString, Future[MultipartUploadResult]] = {
val akkaSink: AkkaSink[ByteString, Future[MultipartUploadResult]] =
AlpakkaS3.multipartUpload(
config.bucket,
key = key.value,
contentType = contentType,
metaHeaders = MetaHeaders(Map(
"Content-Encoding" -> contentEncoding.value) ++ headers.metaHeaders)
)
akkaSink
}
def generateSignedUri(key: S3Key, validAsOf: DateTime): F[Uri] =
config.generatePresignedUrl[F](key, validAsOf)
}
object Compat {
type AkkaSink[-T, +Mat] = akka.stream.scaladsl.Sink[T, Mat]
val AkkaSink = akka.stream.scaladsl.Sink
/**
* A relatively efficient means of converting from a stream of bytes to
* one of akka [[ByteString]]s.
*
* This approach relies on mapping full chunks at a time to a
* single ByteString via a backing array
*/
def byteToByteString[F[_]]: Pipe[F, Byte, ByteString] =
_.mapChunks(c => Chunk.singleton(ByteString.fromArray(c.toBytes.toArray)))
implicit class RichAkkaSink[A, B](val sink: AkkaSink[A, Future[B]]) extends AnyVal {
/** Converts an akka sink with a success-status-indicating Future[B]
* materialized result into an fs2 stream which will fail if the Future fails.
* The stream returned by this will emit the Future's value one time at the end,
* then terminate.
*/
def toPipeMat[F[_]: ConcurrentEffect: ContextShift](
implicit ec: ExecutionContext,
m: Materializer
): Pipe[F, A, B] =
Compat.toPipeMat[F, A, B](sink)
/** Converts an akka sink with a success-status-indicating Future[B]
* materialized result into an fs2 stream which will fail if the Future fails.
* The stream returned by this will emit the Future's value one time at the end,
* then terminate.
*/
def toPipeMat[F[_]: ConcurrentEffect: ContextShift, A, B](
akkaSink: AkkaSink[A, Future[B]]
)(
implicit ec: ExecutionContext,
m: Materializer
): Pipe[F, A, B] = {
val mkPromise = Deferred[F, Either[Throwable, B]]
// `Pipe` is just a function of Stream[F, A] => Stream[F, B], so we take a stream as input.
in =>
Stream.eval(mkPromise).flatMap { p =>
// Akka streams produce a materialized value as a side effect of being run.
// streamz-converters allows us to have a `Future[Done] => Unit` callback when that materialized value is created.
// This callback tells the akka materialized future to store its result status into the Promise
val captureMaterializedResult: Future[B] => Unit = _.onComplete {
case Failure(ex) => p.complete(Left(ex)).toIO.unsafeRunSync
case Success(value) => p.complete(Right(value)).toIO.unsafeRunSync
}
// toSink is from streamz-converters; convert an akka sink to fs2 sink with a callback for the materialized values
val fs2Sink: Pipe[F, A, Unit] =
akkaSink.toSink(captureMaterializedResult)
val fs2Stream: Stream[F, Unit] = fs2Sink.apply(in)
val materializedResultStream: Stream[F, B] = Stream.eval {
p.get // Async wait on the promise to be completed; => F[Either[Throwable, B]]
.rethrow // F[Either[Throwable, B]] => F[B]
}
// Run the akka sink for its effects and then run stream containing the effect of getting the Promise results
fs2Stream.drain ++ materializedResultStream
}
}
}
import akka.http.scaladsl.model.{ContentType, ContentTypes}
import akka.stream.alpakka.s3.MetaHeaders
import fs2.{Pipe, Stream}
case class S3Key(value: String) extends AnyVal
case class ContentEncoding(value: String) extends AnyVal
/** Content encodings
*
* @see [[https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding#Directives]]
*/
object ContentEncoding {
val gzip = ContentEncoding("gzip")
/** No encoding */
val identity = ContentEncoding("identity")
}
/** Encapsulates "upload byte to s3" as an interface */
trait S3Put[F[_]] {
/** Implementation-defined result object from uploading to s3 */
type PutResult
/** Upload input bytes to S3 at `key`, filling in the S3 object metadata
* from `contentType`, `contentEncoding`, and `headers` */
def upload(
key: S3Key,
contentType: ContentType = ContentTypes.`text/plain(UTF-8)`,
contentEncoding: ContentEncoding = ContentEncoding.identity,
headers: MetaHeaders = MetaHeaders(Map.empty)
): Pipe[F, Byte, PutResult]
/** Generate a publicly-accessible Uri which will expire some time after `validAsOf`.
* The exact expiry time is unspecified in this interface; usually it will be driven
* by configuration in the specific instance */
def generateSignedUri(key: S3Key, validAsOf: DateTime): F[Uri]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment