Skip to content

Instantly share code, notes, and snippets.

@lucianenache
Last active December 23, 2019 19:00
Show Gist options
  • Save lucianenache/5b2061c87a999f21392c083237ea087d to your computer and use it in GitHub Desktop.
Save lucianenache/5b2061c87a999f21392c083237ea087d to your computer and use it in GitHub Desktop.
Scala FS2 stream to AWS S3 example
// SBT
name := "delete"
scalaVersion := "2.12.7"
libraryDependencies ++= Vector(
"co.fs2" %% "fs2-core" % "1.0.0",
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.442"
)
// Source
import scala.collection.JavaConverters._
import cats.effect.IO
import cats.implicits._
import fs2._
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.services.s3.{ AmazonS3, AmazonS3ClientBuilder }
import com.amazonaws.services.s3.model.{ ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary }
object FS2S3 {
def getObjectSummaries(s3: AmazonS3)(bucketName: String): Stream[IO, S3ObjectSummary] = {
// Initial request, and the basis of continuation requests
val req = new ListObjectsV2Request().withBucketName(bucketName).withMaxKeys(2)
// Return the continuation token of the result, if any
def cont(res: ListObjectsV2Result): Option[String] =
if (res.isTruncated)
Some(res.getNextContinuationToken)
else
None
// Makes a continuation request if necessary, and indicates if ANOTHER is
def next(maybe: Option[String]): IO[Option[(ListObjectsV2Result, Option[String])]] = {
IO {
maybe.map { token =>
val res = s3.listObjectsV2(req.withContinuationToken(token))
(res, cont(res))
}
}
}
// A stream of all the `ListObjectsV2Result`s, with pagination
val ress = for {
res <- Stream.eval(IO(s3.listObjectsV2(req)))
all <- Stream.emit(res) ++ Stream.unfoldEval(cont(res))(next)
} yield all
// A stream of the object summaries
for {
res <- ress
oss <- Stream.emits(res.getObjectSummaries.asScala)
} yield oss
}
// Construct an S3 client, which is an effect
def s3Client: IO[AmazonS3] = IO {
AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new EndpointConfiguration("http://localhost:4572", "us-west-2"))
.withPathStyleAccessEnabled(true)
.build()
}
// Close an S3 client, which is an effect
def closeS3Client(s3: AmazonS3): IO[Unit] = IO(s3.shutdown())
def main(args: Array[String]): Unit = {
// A stream that has the effect of printing the header
val h = Stream.emit("Listing objects").covary[IO] to Sink.showLinesStdOut[IO, String]
/* A stream that has the effects of constructing the S3 client, creating the
S3 bucket, getting the object summaries, and closing the S3 client safely
*/
val oss = for {
s3 <- Stream.bracket(s3Client)(closeS3Client)
_ <- Stream.eval(IO(s3.createBucket("test-bucket")))
s <- getObjectSummaries(s3)("test-bucket")
} yield s
// A stream that has the effect of printing the key and size of each object summary
val o = for {
os <- oss
_ <- Stream.emit(s" - ${os.getKey()} (size: ${os.getSize()})").covary[IO] to Sink.showLinesStdOut[IO, String]
} yield ()
// A stream that has the effect of printing the header and key/size of each object summary
val p = h ++ o
/* Compile the stream to something we can construct an `IO` from, construct the
`IO` by ignoring the stream's values, and run the `IO` synchronously, possibly
throwing exceptions, which we ignore.
*/
p.compile.drain.unsafeRunSync
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment