Skip to content

Instantly share code, notes, and snippets.

@j14159
Created July 18, 2014 20:40
Show Gist options
  • Save j14159/d3cbe172f7b962d74d09 to your computer and use it in GitHub Desktop.
Save j14159/d3cbe172f7b962d74d09 to your computer and use it in GitHub Desktop.
Naive/early S3N RDD for Spark
/**
* Started to rough this naive S3-native filesystem RDD out because I need to use IAM
* profiles for S3 access and also https://issues.apache.org/jira/browse/HADOOP-3733.
*
* Use at your own risk, bear in mind this is maybe 30 - 45min of work and testing and
* expect it to behave as such.
*
* Feedback/criticism/discussion welcome via Github/Twitter
*
* In addition to Spark 1.0.x, this depends on Amazon's S3 SDK, dependency is as follows:
* "com.amazonaws" % "aws-java-sdk" % "1.7.4"
*/
package com.askuity.rdd
import com.amazonaws.auth.InstanceProfileCredentialsProvider
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{ GetObjectRequest, ObjectListing }
import java.io.{ BufferedReader, InputStreamReader }
import org.apache.spark.{ Partition, SparkContext, TaskContext }
import org.apache.spark.rdd.RDD
import scala.collection.JavaConverters._
private [rdd] case class S3NPartition(idx: Int, bucket: String, path: String) extends Partition {
def index = idx
}
/**
* Directly construct and use, roughly equivalent to SparkContext.textFile calls but give this
* a list/sequence of files you want to load. This currently makes 1 Partition per file and
* once constructed, just use it like any other RDD.
*
* Example below will construct a RDD from all files starting with "some-files/file-" in the
* S3 bucket "my-bucket":
*
* new S3RDD(yourSparkContext, "my-bucket", new S3NListing("my-bukkit").list("some-files/file-"))
*/
class S3NRDD(sc: SparkContext, bucket: String, files: Seq[String]) extends RDD[String](sc, Nil) {
private def instanceCreds() = new InstanceProfileCredentialsProvider().getCredentials
override def getPartitions: Array[Partition] =
files.zipWithIndex.map { case (fn, i) => S3NPartition(i, bucket, fn) }.toArray
override def compute(split: Partition, context: TaskContext): Iterator[String] = split match {
case S3NPartition(_, bucket, path) =>
val client = new AmazonS3Client(instanceCreds())
val obj = client.getObject(new GetObjectRequest(bucket, path))
val br = new BufferedReader(new InputStreamReader(obj.getObjectContent()))
Iterator.continually(br.readLine()).takeWhile {
case null =>
br.close()
false
case _ => true
}
}
}
/**
* Simple helper to find files within the given bucket.
*/
class S3NListing(bucket: String) {
private def instanceCreds() = new InstanceProfileCredentialsProvider().getCredentials
lazy val client = new AmazonS3Client(instanceCreds)
/**
* List files behind a given prefix, e.g. "" for all, "my-folder",
* "my-folder/files-that-start-like-this", etc. Will eagerly fetch
* all truncated results.
*/
def list(folder: String) = recursiveListing(folder, None, Nil)
@scala.annotation.tailrec
private def recursiveListing(folder: String, prev: Option[ObjectListing], memo: List[Seq[String]]): List[String] = prev match {
case None =>
val listing = client.listObjects(bucket, folder)
val keys = listing.getObjectSummaries.asScala.map(_.getKey)
if (listing.isTruncated)
recursiveListing(folder, Some(listing), keys :: memo)
else
keys.toList
case Some(lastListing) =>
val listing = client.listNextBatchOfObjects(lastListing)
val keys = listing.getObjectSummaries.asScala.map(_.getKey())
if(listing.isTruncated)
recursiveListing(folder, Some(listing), keys :: memo)
else
(keys :: memo).flatten
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment