Last active
February 5, 2021 22:26
-
-
Save skritch/2903a43c2e671ebe46fe87d03896b1e4 to your computer and use it in GitHub Desktop.
List S3 files fast with Flink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.collection.JavaConverters._ | |
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain | |
import com.amazonaws.services.s3.model.{ListObjectsV2Request, ListObjectsV2Result} | |
import com.amazonaws.services.s3.{AmazonS3, AmazonS3ClientBuilder} | |
import org.apache.flink.api.common.serialization.SimpleStringEncoder | |
import org.apache.flink.api.scala._ | |
import org.apache.flink.core.fs.Path | |
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink | |
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.BasePathBucketAssigner | |
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy | |
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} | |
import org.joda.time.{DateTime, DateTimeZone} | |
case class S3Object( | |
key: String, | |
eTag: String, | |
size: Long, | |
lastModified: DateTime | |
) { | |
def toRow: String = s"$key|$eTag|$size|${lastModified.withZone(DateTimeZone.UTC)}" | |
} | |
object ListFilesJob { | |
val delimiter = "/" | |
val timeoutMillis = 60 * 1000 | |
@transient lazy val s3Client: AmazonS3 = AmazonS3ClientBuilder.standard() | |
.withCredentials(new DefaultAWSCredentialsProviderChain()).build() | |
def listObjects(bucket: String, prefix: String): Iterator[Either[String, S3Object]] = { | |
val req = new ListObjectsV2Request() | |
.withBucketName(bucket) | |
.withPrefix(prefix) | |
.withDelimiter(delimiter) | |
def paginateRequest(req: ListObjectsV2Request): Iterator[ListObjectsV2Result] = { | |
val result: ListObjectsV2Result = s3Client.listObjectsV2(req) | |
Iterator(result) ++ { | |
if (result.isTruncated) { | |
paginateRequest(req.withContinuationToken(result.getNextContinuationToken)) | |
} else { | |
Iterator.empty | |
} | |
} | |
} | |
paginateRequest(req).flatMap { result => | |
val objects: Seq[Either[String, S3Object]] = result.getObjectSummaries.asScala | |
.map { obj => | |
Right(S3Object( | |
obj.getKey, | |
obj.getETag, | |
obj.getSize, | |
new DateTime(obj.getLastModified) | |
)) | |
} | |
val dirs: Seq[Either[String, S3Object]] = result.getCommonPrefixes | |
.asScala.map(s => Left(s)) | |
objects ++ dirs | |
} | |
} | |
def getNestedObjects(bucket: String, prefixes: DataStream[String]): DataStream[S3Object] = { | |
val listIteration = (iteration: DataStream[String]) => { | |
val results: DataStream[Either[String, S3Object]] = iteration | |
.flatMap(p => listObjects(bucket, p)) | |
( | |
results.flatMap(_.left.toOption).rebalance, | |
results.flatMap(_.right.toOption) | |
) | |
} | |
prefixes.iterate(listIteration, maxWaitTimeMillis = timeoutMillis) | |
} | |
def run(bucket: String, prefix: String, output: String): Unit = { | |
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment | |
val init: DataStream[String] = env.fromElements(prefix).map(x => x) | |
val objects: DataStream[S3Object] = getNestedObjects(bucket, init) | |
val sink: StreamingFileSink[String] = StreamingFileSink | |
.forRowFormat(new Path(output), new SimpleStringEncoder[String]("UTF-8")) | |
.withBucketAssigner(new BasePathBucketAssigner()) | |
.withRollingPolicy( | |
DefaultRollingPolicy.builder() | |
.withRolloverInterval(timeoutMillis) | |
.withInactivityInterval(timeoutMillis) | |
.withMaxPartSize(200 * 1024 * 1024) | |
.build()) | |
.build() | |
objects.map(o => o.toRow) | |
.addSink(sink) | |
// .writeAsText(output) | |
env.execute() | |
} | |
def main(args: Array[String]): Unit = args.toList match { | |
case bucket :: output :: Nil => run(bucket, "", output) | |
case bucket :: prefix :: output :: Nil => run(bucket, prefix, output) | |
case _ => throw new IllegalArgumentException("bad args") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note: I ran this locally on Flink 1.10.1, with approximately these dependencies: