Last active
December 23, 2019 19:00
-
-
Save lucianenache/5b2061c87a999f21392c083237ea087d to your computer and use it in GitHub Desktop.
Scala FS2 stream to AWS S3 example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// SBT | |
name := "delete" | |
scalaVersion := "2.12.7" | |
libraryDependencies ++= Vector( | |
"co.fs2" %% "fs2-core" % "1.0.0", | |
"com.amazonaws" % "aws-java-sdk-s3" % "1.11.442" | |
) | |
// Source | |
import scala.collection.JavaConverters._ | |
import cats.effect.IO | |
import cats.implicits._ | |
import fs2._ | |
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration | |
import com.amazonaws.services.s3.{ AmazonS3, AmazonS3ClientBuilder } | |
import com.amazonaws.services.s3.model.{ ListObjectsV2Request, ListObjectsV2Result, S3ObjectSummary } | |
object FS2S3 { | |
def getObjectSummaries(s3: AmazonS3)(bucketName: String): Stream[IO, S3ObjectSummary] = { | |
// Initial request, and the basis of continuation requests | |
val req = new ListObjectsV2Request().withBucketName(bucketName).withMaxKeys(2) | |
// Return the continuation token of the result, if any | |
def cont(res: ListObjectsV2Result): Option[String] = | |
if (res.isTruncated) | |
Some(res.getNextContinuationToken) | |
else | |
None | |
// Makes a continuation request if necessary, and indicates if ANOTHER is | |
def next(maybe: Option[String]): IO[Option[(ListObjectsV2Result, Option[String])]] = { | |
IO { | |
maybe.map { token => | |
val res = s3.listObjectsV2(req.withContinuationToken(token)) | |
(res, cont(res)) | |
} | |
} | |
} | |
// A stream of all the `ListObjectsV2Result`s, with pagination | |
val ress = for { | |
res <- Stream.eval(IO(s3.listObjectsV2(req))) | |
all <- Stream.emit(res) ++ Stream.unfoldEval(cont(res))(next) | |
} yield all | |
// A stream of the object summaries | |
for { | |
res <- ress | |
oss <- Stream.emits(res.getObjectSummaries.asScala) | |
} yield oss | |
} | |
// Construct an S3 client, which is an effect | |
def s3Client: IO[AmazonS3] = IO { | |
AmazonS3ClientBuilder.standard() | |
.withEndpointConfiguration(new EndpointConfiguration("http://localhost:4572", "us-west-2")) | |
.withPathStyleAccessEnabled(true) | |
.build() | |
} | |
// Close an S3 client, which is an effect | |
def closeS3Client(s3: AmazonS3): IO[Unit] = IO(s3.shutdown()) | |
def main(args: Array[String]): Unit = { | |
// A stream that has the effect of printing the header | |
val h = Stream.emit("Listing objects").covary[IO] to Sink.showLinesStdOut[IO, String] | |
/* A stream that has the effects of constructing the S3 client, creating the | |
S3 bucket, getting the object summaries, and closing the S3 client safely | |
*/ | |
val oss = for { | |
s3 <- Stream.bracket(s3Client)(closeS3Client) | |
_ <- Stream.eval(IO(s3.createBucket("test-bucket"))) | |
s <- getObjectSummaries(s3)("test-bucket") | |
} yield s | |
// A stream that has the effect of printing the key and size of each object summary | |
val o = for { | |
os <- oss | |
_ <- Stream.emit(s" - ${os.getKey()} (size: ${os.getSize()})").covary[IO] to Sink.showLinesStdOut[IO, String] | |
} yield () | |
// A stream that has the effect of printing the header and key/size of each object summary | |
val p = h ++ o | |
/* Compile the stream to something we can construct an `IO` from, construct the | |
`IO` by ignoring the stream's values, and run the `IO` synchronously, possibly | |
throwing exceptions, which we ignore. | |
*/ | |
p.compile.drain.unsafeRunSync | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment