Skip to content

Instantly share code, notes, and snippets.

@skritch
Last active February 5, 2021 22:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save skritch/2903a43c2e671ebe46fe87d03896b1e4 to your computer and use it in GitHub Desktop.
Save skritch/2903a43c2e671ebe46fe87d03896b1e4 to your computer and use it in GitHub Desktop.
List S3 files fast with Flink
import scala.collection.JavaConverters._
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain
import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result}
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder}
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.api.scala._
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.joda.time.{DateTime, DateTimeZone}
case class S3Object(
key: String,
eTag: String,
size: Long,
lastModified: DateTime
) {
def toRow: String = s"$key|$eTag|$size|${lastModified.withZone(DateTimeZone.UTC)}"
}
object ListFilesJob {
val delimiter = "/"
val timeoutMillis = 60 * 1000
@transient lazy val s3Client: AmazonS3 = AmazonS3ClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain()).build()
def listObjects(bucket: String, prefix: String): Iterator[Either[String, S3Object]] = {
val req = new ListObjectsV2Request()
.withBucketName(bucket)
.withPrefix(prefix)
.withDelimiter(delimiter)
def paginateRequest(req: ListObjectsV2Request): Iterator[ListObjectsV2Result] = {
val result: ListObjectsV2Result = s3Client.listObjectsV2(req)
Iterator(result) ++ {
if (result.isTruncated) {
paginateRequest(req.withContinuationToken(result.getNextContinuationToken))
} else {
Iterator.empty
}
}
}
paginateRequest(req).flatMap { result =>
val objects: Seq[Either[String, S3Object]] = result.getObjectSummaries.asScala
.map { obj =>
Right(S3Object(
obj.getKey,
obj.getETag,
obj.getSize,
new DateTime(obj.getLastModified)
))
}
val dirs: Seq[Either[String, S3Object]] = result.getCommonPrefixes
.asScala.map(s => Left(s))
objects ++ dirs
}
}
def getNestedObjects(bucket: String, prefixes: DataStream[String]): DataStream[S3Object] = {
val listIteration = (iteration: DataStream[String]) => {
val results: DataStream[Either[String, S3Object]] = iteration
.flatMap(p => listObjects(bucket, p))
(
results.flatMap(_.left.toOption).rebalance,
results.flatMap(_.right.toOption)
)
}
prefixes.iterate(listIteration, maxWaitTimeMillis = timeoutMillis)
}
def run(bucket: String, prefix: String, output: String): Unit = {
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val init: DataStream[String] = env.fromElements(prefix).map(x => x)
val objects: DataStream[S3Object] = getNestedObjects(bucket, init)
val sink: StreamingFileSink[String] = StreamingFileSink
.forRowFormat(new Path(output), new SimpleStringEncoder[String]("UTF-8"))
.withBucketAssigner(new BasePathBucketAssigner())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(timeoutMillis)
.withInactivityInterval(timeoutMillis)
.withMaxPartSize(200 * 1024 * 1024)
.build())
.build()
objects.map(o => o.toRow)
.addSink(sink)
// .writeAsText(output)
env.execute()
}
def main(args: Array[String]): Unit = args.toList match {
case bucket :: output :: Nil => run(bucket, "", output)
case bucket :: prefix :: output :: Nil => run(bucket, prefix, output)
case _ => throw new IllegalArgumentException("bad args")
}
}
@skritch
Copy link
Author

skritch commented Feb 5, 2021

Note: I ran this locally on Flink 1.10.1, with approximately these dependencies:


ThisBuild / scalaVersion := "2.12.12"

val flinkVersion = "1.10.1"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.flink" % "flink-json" % flinkVersion,
  "org.apache.flink" % "flink-csv" % flinkVersion,
  "com.amazonaws" % "aws-java-sdk" % "1.11.271"
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment