Skip to content

Instantly share code, notes, and snippets.

@afsalthaj
Last active November 7, 2020 14:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afsalthaj/c0f432d324391121dd8367dff99bb9e8 to your computer and use it in GitHub Desktop.
Save afsalthaj/c0f432d324391121dd8367dff99bb9e8 to your computer and use it in GitHub Desktop.
import cats.effect.{ IO, Resource }
import com.amazonaws.services.s3.AmazonS3
import com.amazonaws.services.s3.model.ListObjectsV2Request
import org.apache.commons.lang.StringUtils
def getKeys(client: AmazonS3, path: S3Path): IO[List[String]] = IO {
client
.listObjectsV2(
new ListObjectsV2Request().withBucketName(path.parentBucket).withPrefix(path.prefix)
)
.getObjectSummaries
.asScala
.toList
.map(_.getKey)
.map(getFileName)
}
// Includes renaming, writing to some other file within the context of Resource
def downloadAllFiles(
s3Client: AmazonS3,
source: S3Path,
dataFileNaming: Naming[TargetFile],
featureDetails: FeatureTablePath,
runDate: ZonedDateTime
): IO[Resources[TemporaryLocalFile]] =
getKeys(s3Client, source)
.flatTap(
list =>
IO.raiseError(
new RuntimeException(s"Zero files in the s3 path ${source.asString}")
)
.whenA(list.isEmpty)
)
.map(fileNames => {
fileNames.map(
fileName =>
downloadFile(s3Client, source, fileName)
.evalMap(
localFile =>
for {
r <- IO.fromEither(
dataFileNaming
.getName(Naming.Input(featureDetails, runDate, fileName), _.toTargetFileName)
.leftMap(_.toThrowable("Failed to generate a file name."))
)
data <- IO.pure(Data(localFile, r.name, FileType.DataFile))
} yield data
)
)
})
.map(r => Resources(r))
def downloadFile(
amazonS3: AmazonS3,
source: S3Path,
fileName: String
): Resource[IO, TemporaryLocalFile] = {
val (bucket, key) = (source.asString, fileName)
Resource.fromAutoCloseable(
IO(amazonS3.getObject(source.asString, key))
.withMessageOnError(s"Failed to get ${key} from ${source.asString}")
) flatMap { s3Obj =>
TemporaryLocalFile
.createResource("S3_DOWNLOADED", Some(Prefix.mk(key)), None)
.evalMap(r => r.writeFrom(s3Obj).map(_ => r))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment