Skip to content

Instantly share code, notes, and snippets.

@tobyjsullivan
Forked from j14159/gist:d3cbe172f7b962d74d09
Last active August 29, 2015 14:05
Show Gist options
  • Save tobyjsullivan/fcef2e1d4da2d792372d to your computer and use it in GitHub Desktop.
Save tobyjsullivan/fcef2e1d4da2d792372d to your computer and use it in GitHub Desktop.
/**
* Based on the S3N RDD gist from Jeremy Pierre https://gist.github.com/j14159/d3cbe172f7b962d74d09
*
* Modified to use Jets3t
*/
package net.tobysullivan.spark.rdd
import java.io.{BufferedInputStream, BufferedReader, InputStreamReader}
import java.util.zip.GZIPInputStream
import org.apache.spark.rdd.RDD
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.jets3t.service.impl.rest.httpclient.RestS3Service
import org.jets3t.service.model.S3Bucket
import org.jets3t.service.security.AWSCredentials
import scala.io.Source
private [rdd] case class S3NPartition(idx: Int, bucket: String, path: String) extends Partition {
def index = idx
}
/**
* Directly construct and use, roughly equivalent to SparkContext.textFile calls but give this
* a list/sequence of files you want to load. This currently makes 1 Partition per file and
* once constructed, just use it like any other RDD.
*
* Example below will construct a RDD from all files starting with "some-files/file-" in the
* S3 bucket "my-bucket":
*
* new S3RDD(awsAccessKey, awsSecretKey, yourSparkContext, "my-bucket", new S3NListing("my-bukkit").list("some-files/file-"))
*/
class S3NRDD(awsAccessKeyId: String, awsSecretAccessKey: String, sc: SparkContext, bucket: String, files: Seq[String]) extends RDD[String](sc, Nil) {
lazy val credentials = new AWSCredentials(awsAccessKeyId, awsSecretAccessKey)
override def getPartitions: Array[Partition] =
files.zipWithIndex.map { case (fn, i) => S3NPartition(i, bucket, fn) }.toArray
override def compute(split: Partition, context: TaskContext): Iterator[String] = split match {
case S3NPartition(_, bucket, path) =>
val service = new RestS3Service(credentials)
val s3Bucket = new S3Bucket(bucket)
val obj = service.getObject(s3Bucket, path)
val content = obj.getDataInputStream()
// Uncompress any gzipped files
if (obj.getKey.endsWith(".gz"))
Source.fromInputStream(new GZIPInputStream(new BufferedInputStream(content))).getLines()
else {
val br = new BufferedReader(new InputStreamReader(content))
Iterator.continually(br.readLine()).takeWhile {
case null =>
br.close()
false
case _ => true
}
}
}
}
/**
* Simple helper to find files within the given bucket.
*/
class S3NListing(awsAccessKeyId: String, awsSecretAccessKey: String, bucket: String) {
lazy val credentials = new AWSCredentials(awsAccessKeyId, awsSecretAccessKey)
lazy val service = new RestS3Service(credentials)
lazy val s3Bucket = new S3Bucket(bucket, S3Bucket.LOCATION_US)
/**
* List files behind a given prefix, e.g. "" for all, "my-folder",
* "my-folder/files-that-start-like-this", etc. Will eagerly fetch
* all truncated results.
*/
def list(folder: String) = {
val listing = service.listObjects(s3Bucket, folder, null)
val keys = listing.map { obj =>
val key = obj.getKey
key
}
keys.toSeq
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment