Skip to content

Instantly share code, notes, and snippets.

@ianoc-stripe
Created September 25, 2018 23:00
Show Gist options
  • Save ianoc-stripe/f2d080d43846d2205bf0a9b94770c3c1 to your computer and use it in GitHub Desktop.
Save ianoc-stripe/f2d080d43846d2205bf0a9b94770c3c1 to your computer and use it in GitHub Desktop.
S3FileSystem code to wrap s3
/*
Copyright 2018 Stripe Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
package com.stripe.hadoop.fs.s3
import org.apache.hadoop.fs._
import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.util.Progressable
import org.apache.hadoop.fs.permission.FsPermission
import java.util.EnumSet
object S3FileSystem {
def changeScheme(u: URI, newScheme: String): URI = {
new URI(newScheme,
u.getUserInfo(),
u.getHost(),
u.getPort(),
u.getPath(),
u.getQuery(),
u.getFragment())
}
def s3ToS3a(u: URI): URI =
u.getScheme match {
case "s3" => changeScheme(u, "s3a")
case o => u
}
def s3aToS3(u: URI): URI =
u.getScheme match {
case "s3a" => changeScheme(u, "s3")
case o => u
}
def s3ToS3a(u: Path): Path = new Path(s3ToS3a(u.toUri))
def s3aToS3(u: Path): Path = new Path(s3aToS3(u.toUri))
def s3ToS3a(u: FileStatus): FileStatus = {
u.setPath(s3ToS3a(u.getPath))
u
}
def s3aToS3(u: FileStatus): FileStatus = {
u.setPath(s3aToS3(u.getPath))
u
}
def s3ToS3a(u: LocatedFileStatus): LocatedFileStatus = {
u.setPath(s3ToS3a(u.getPath))
u
}
def s3aToS3(u: LocatedFileStatus): LocatedFileStatus = {
u.setPath(s3aToS3(u.getPath))
u
}
def mapRemoteIterator[T, U](iter: RemoteIterator[T])(fn: T => U): RemoteIterator[U] =
new RemoteIterator[U] {
def hasNext = iter.hasNext
def next = fn(iter.next)
}
def s3ToS3a(u: RemoteIterator[LocatedFileStatus]): RemoteIterator[LocatedFileStatus] =
mapRemoteIterator(u) { s3ToS3a(_) }
def s3aToS3(u: RemoteIterator[LocatedFileStatus]): RemoteIterator[LocatedFileStatus] =
mapRemoteIterator(u) { s3aToS3(_) }
}
class S3FileSystem(private[this] val innerS3AFs: ExposingS3AFileSystem) extends FileSystem {
import S3FileSystem._
def this() {
this(new ExposingS3AFileSystem())
}
private[this] var ourUri: URI = null
/** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc.
* for this FileSystem
* @param originalConf the configuration to use for the FS. The
* bucket-specific options are patched over the base ones before any use is
* made of the config.
*/
override def initialize(name: URI, originalConf: Configuration): Unit = {
// the passed in uri should be s3:// like
// this is our URI
// the hidden S3a file system will have a s3a version of the same uri
// but if we are asked our name/URI should be the original passed in.
ourUri = name
innerS3AFs.initialize(s3ToS3a(name), originalConf)
}
/**
* Append to an existing file (optional operation).
* @param f the existing file to be appended.
* @param bufferSize the size of the buffer to be used.
* @param progress for reporting progress if it is not null.
* @throws IOException indicating that append is not supported.
*/
def append(path: Path, bufferSize: Int, progress: Progressable): FSDataOutputStream =
innerS3AFs.append(s3ToS3a(path), bufferSize, progress)
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* @param f the file name to open
* @param permission the permission to set.
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize the requested block size.
* @param progress the progress reporter.
* @throws IOException in the event of IO related errors.
* @see #setPermission(Path, FsPermission)
*/
def create(path: Path,
permission: FsPermission,
overwrite: Boolean,
bufferSize: Int,
replication: Short,
blockSize: Long,
progress: Progressable): FSDataOutputStream =
innerS3AFs.create(s3ToS3a(path),
permission,
overwrite,
bufferSize,
replication,
blockSize,
progress)
/**
* Delete a Path. This operation is at least {@code O(files)}, with
* added overheads to enumerate the path. It is also not atomic.
*
* @param f the path to delete.
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if delete is successful else false.
* @throws IOException due to inability to delete a directory or file.
*/
def delete(path: Path, recursive: Boolean): Boolean = innerS3AFs.delete(s3ToS3a(path), recursive)
/**
* Return a file status object that represents the path.
* @param f The path we want information from
* @return a FileStatus object
* @throws FileNotFoundException when the path does not exist
* @throws IOException on other problems.
*/
def getFileStatus(path: Path): FileStatus = s3aToS3(innerS3AFs.doGetFileStatus(s3ToS3a(path)))
/**
* Return the protocol scheme for the FileSystem.
*
* @return "s3"
*/
@Override
override def getScheme(): String = "s3"
/**
* Returns a URI whose scheme and authority identify this FileSystem.
*/
@Override
def getUri(): URI =
ourUri
override def getDefaultPort() = -1
def getWorkingDirectory(): Path = s3aToS3(innerS3AFs.getWorkingDirectory())
/**
* Set the current working directory for the given file system. All relative
* paths will be resolved relative to it.
*
* @param newDir the current working directory.
*/
def setWorkingDirectory(path: Path): Unit = innerS3AFs.setWorkingDirectory(s3ToS3a(path))
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist;
* IOException see specific implementation
*/
def listStatus(path: Path): Array[FileStatus] = innerS3AFs.listStatus(s3ToS3a(path)).map(s3aToS3)
/**
*
* Make the given path and all non-existent parents into
* directories. Has the semantics of Unix {@code 'mkdir -p'}.
* Existence of the directory hierarchy is not an error.
* @param path path to create
* @param permission to apply to f
* @return true if a directory was created
* @throws FileAlreadyExistsException there is a file at the path specified
* @throws IOException other IO problems
*/
// TODO: If we have created an empty file at /foo/bar and we then call
// mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
def mkdirs(path: Path, permission: FsPermission): Boolean =
innerS3AFs.mkdirs(s3ToS3a(path), permission)
def open(path: Path, bufferSize: Int): FSDataInputStream =
innerS3AFs.open(s3ToS3a(path), bufferSize)
/**
* Renames Path src to Path dst. Can take place on local fs
* or remote DFS.
*
* Warning: S3 does not support renames. This method does a copy which can
* take S3 some time to execute with large files and directories. Since
* there is no Progressable passed in, this can time out jobs.
*
* Note: This implementation differs with other S3 drivers. Specifically:
* <pre>
* Fails if src is a file and dst is a directory.
* Fails if src is a directory and dst is a file.
* Fails if the parent of dst does not exist or is a file.
* Fails if dst is a directory that is not empty.
* </pre>
*
* @param src path to be renamed
* @param dst new path after rename
* @throws IOException on IO failure
* @return true if rename is successful
*/
def rename(src: Path, dest: Path): Boolean = innerS3AFs.rename(s3ToS3a(src), s3ToS3a(dest))
// These aren't required to be overridden by the FileSystem but the s3a file system does
/**
* Check that a Path belongs to this FileSystem.
* Unlike the superclass, this version does not look at authority,
* only hostnames.
* @param path to check
* @throws IllegalArgumentException if there is an FS mismatch
*/
override def checkPath(path: Path) = innerS3AFs.doCheckPath(s3ToS3a(path))
override def canonicalizeUri(rawUri: URI) =
s3aToS3(innerS3AFs.doCanonicalizeUri(s3ToS3a(rawUri)))
/**
* {@inheritDoc}
* @throws FileNotFoundException if the parent directory is not present -or
* is not a directory.
*/
override def createNonRecursive(
path: Path,
permission: FsPermission,
flags: EnumSet[CreateFlag],
bufferSize: Int,
replication: Short,
blockSize: Long,
progress: Progressable
): FSDataOutputStream =
innerS3AFs.doCreateNonRecursive(s3ToS3a(path),
permission,
flags,
bufferSize,
replication,
blockSize,
progress)
/**
* The src file is on the local disk. Add it to FS at
* the given dst name.
*
* This version doesn't need to create a temporary file to calculate the md5.
* Sadly this doesn't seem to be used by the shell cp :(
*
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param src path
* @param dst path
* @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and
* overwrite==false
* @throws AmazonClientException failure in the AWS SDK
*/
override def copyFromLocalFile(
delSrc: Boolean,
overwrite: Boolean,
src: Path,
dst: Path
): Unit = innerS3AFs.copyFromLocalFile(delSrc, overwrite, s3ToS3a(src), s3ToS3a(dst))
/**
* Close the filesystem. This shuts down all transfers.
* @throws IOException IO problem
*/
override def close(): Unit = innerS3AFs.close()
override def getCanonicalServiceName(): String = innerS3AFs.getCanonicalServiceName()
/**
* {@inheritDoc}.
*
* This implementation is optimized for S3, which can do a bulk listing
* off all entries under a path in one single operation. Thus there is
* no need to recursively walk the directory tree.
*
* Instead a {@link ListObjectsRequest} is created requesting a (windowed)
* listing of all entries under the given path. This is used to construct
* an {@code ObjectListingIterator} instance, iteratively returning the
* sequence of lists of elements under the path. This is then iterated
* over in a {@code FileStatusListingIterator}, which generates
* {@link S3AFileStatus} instances, one per listing entry.
* These are then translated into {@link LocatedFileStatus} instances.
*
* This is essentially a nested and wrapped set of iterators, with some
* generator classes; an architecture which may become less convoluted
* using lambda-expressions.
* @param f a path
* @param recursive if the subdirectories need to be traversed recursively
*
* @return an iterator that traverses statuses of the files/directories
* in the given path
* @throws FileNotFoundException if {@code path} does not exist
* @throws IOException if any I/O error occurred
*/
override def listFiles(f: Path, recursive: Boolean): RemoteIterator[LocatedFileStatus] =
s3aToS3(innerS3AFs.listFiles(s3ToS3a(f), recursive))
/**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
*/
override def listLocatedStatus(f: Path): RemoteIterator[LocatedFileStatus] =
s3aToS3(innerS3AFs.listLocatedStatus(s3ToS3a(f)))
}
///// TESTS:
package com.stripe.hadoop.fs.s3
import org.scalatest._
import org.junit.runner.RunWith
import org.scalatest.junit.JUnitRunner
import org.scalatest.mock.MockitoSugar
import org.apache.hadoop.fs._
import org.mockito.Mockito._
import org.mockito.ArgumentMatchers.{any,anyBoolean}
import java.net.URI
import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.util.Progressable
@RunWith(classOf[JUnitRunner])
class S3FileSystemTest extends FunSuite with MockitoSugar {
val s3Path = new Path("s3://myBucket/path/to/file")
val s3PathAsS3a = new Path("s3a://myBucket/path/to/file")
val s3PathB = new Path("s3://myBucket/path/to/fileB")
val s3PathBAsS3a = new Path("s3a://myBucket/path/to/fileB")
def withS3(testFn: (ExposingS3AFileSystem, S3FileSystem) => Unit) {
val mockedS3aFilesystem: ExposingS3AFileSystem = mock[ExposingS3AFileSystem]
val s3Fs = new S3FileSystem(mockedS3aFilesystem)
testFn(mockedS3aFilesystem, s3Fs)
}
test("test append") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
s3Fs.append(s3Path, 10, null)
verify(mockedS3aFilesystem).append(s3PathAsS3a, 10, null);
}
}
test("test create") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
s3Fs.create(s3Path, null: FsPermission, true, 10, 3: Short, 20L, null: Progressable)
verify(mockedS3aFilesystem).create(s3PathAsS3a, null: FsPermission, true, 10, 3: Short, 20L, null: Progressable);
}
}
test("test delete") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
s3Fs.delete(s3Path, true)
verify(mockedS3aFilesystem).delete(s3PathAsS3a, true)
}
}
test("test getFileStatus") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
val fs = new FileStatus(1234L, false, 3, 128, 1234L, s3PathAsS3a)
when(mockedS3aFilesystem.doGetFileStatus(any())).thenReturn(fs)
val returnedFileStatus = s3Fs.getFileStatus(s3Path)
verify(mockedS3aFilesystem).doGetFileStatus(s3PathAsS3a)
// we ensure the path gets round tripped fully
assert(returnedFileStatus.getPath() === s3Path)
assert(returnedFileStatus.getLen() === 1234L)
}
}
test("test getScheme") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
// should not proxy this since callers should think this is an S3 scheme
assert(s3Fs.getScheme == "s3")
}
}
test("test initialize and getUri") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
val s3Uri = new URI("s3://abcd")
val s3aUri = new URI("s3a://abcd")
s3Fs.initialize(s3Uri, null)
verify(mockedS3aFilesystem).initialize(s3aUri, null)
assert(s3Fs.getUri() === s3Uri)
}
}
test("test getWorkingDirectory") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
when(mockedS3aFilesystem.getWorkingDirectory()).thenReturn(s3PathAsS3a)
val workingDirectory = s3Fs.getWorkingDirectory()
assert(workingDirectory === s3Path)
}
}
test("test setWorkingDirectory") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
val returnedFileStatus = s3Fs.setWorkingDirectory(s3Path)
verify(mockedS3aFilesystem).setWorkingDirectory(s3PathAsS3a)
}
}
test("test listStatus") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
val fileStatuses = (0 until 20).map { size =>
new FileStatus(size.toLong, false, 3, 128, 1234L, s3PathAsS3a)
}.toArray
when(mockedS3aFilesystem.listStatus(any())).thenReturn(fileStatuses)
val returnedFileStatusArray = s3Fs.listStatus(s3Path)
verify(mockedS3aFilesystem).listStatus(s3PathAsS3a)
// we ensure the path gets round tripped fully
assert(returnedFileStatusArray.toList === fileStatuses.toList)
}
}
test("test mkdirs") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
s3Fs.mkdirs(s3Path, null)
verify(mockedS3aFilesystem).mkdirs(s3PathAsS3a, null)
}
}
test("test open") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
s3Fs.open(s3Path, 55)
verify(mockedS3aFilesystem).open(s3PathAsS3a, 55)
}
}
test("test rename") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
s3Fs.rename(s3Path, s3PathB)
verify(mockedS3aFilesystem).rename(s3PathAsS3a, s3PathBAsS3a)
}
}
test("test checkPath") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
s3Fs.checkPath(s3Path)
verify(mockedS3aFilesystem).doCheckPath(s3PathAsS3a)
}
}
test("test canonicalizeUri") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
when(mockedS3aFilesystem.doCanonicalizeUri(any())).thenReturn(s3PathAsS3a.toUri)
s3Fs.canonicalizeUri(s3Path.toUri)
verify(mockedS3aFilesystem).doCanonicalizeUri(s3PathAsS3a.toUri)
}
}
test("test createNonRecursive") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
s3Fs.createNonRecursive(s3Path, null, null, 133, 3: Short, 128, null)
verify(mockedS3aFilesystem).doCreateNonRecursive(s3PathAsS3a, null, null, 133, 3: Short, 128, null)
}
}
test("test copyFromLocalFile") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
s3Fs.copyFromLocalFile(false, true, s3Path, s3PathB)
verify(mockedS3aFilesystem).copyFromLocalFile(false, true, s3PathAsS3a, s3PathBAsS3a)
}
}
test("test close") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
s3Fs.close()
verify(mockedS3aFilesystem).close()
}
}
test("test getCanonicalServiceName") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
when(mockedS3aFilesystem.getCanonicalServiceName()).thenReturn("abcded")
assert(s3Fs.getCanonicalServiceName() === "abcded")
}
}
def remoteIteratorToList[T](r: RemoteIterator[T]): List[T] = {
@annotation.tailrec
def go(acc: List[T], r: RemoteIterator[T]): List[T] = {
if(r.hasNext()) {
go(r.next() :: acc, r)
} else acc.reverse
}
go(Nil, r)
}
def listToRemoteIterator[T](l: List[T]): RemoteIterator[T] = new RemoteIterator[T] {
var remaining: List[T] = l
def hasNext = remaining != Nil
def next() = {
require(hasNext)
val (h :: t) = remaining
remaining = t
h
}
}
test("test listFiles") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
val locatedFileStatus = (0 until 20).map { size =>
new LocatedFileStatus(new FileStatus(size.toLong, false, 3, 128, 1234L, s3PathBAsS3a), Array())
}.toList
when(mockedS3aFilesystem.listFiles(any(), anyBoolean())).thenReturn(listToRemoteIterator(locatedFileStatus))
val returnedIterator = s3Fs.listFiles(s3Path, true)
val returnedList = remoteIteratorToList(returnedIterator)
verify(mockedS3aFilesystem).listFiles(s3PathAsS3a, true)
assert(returnedList.map(_.getPath).distinct === List(s3PathB))
assert(returnedList === locatedFileStatus.map{ e => e.setPath(s3PathB); e})
}
}
test("test listLocatedStatus") {
withS3 { case (mockedS3aFilesystem, s3Fs) =>
val locatedFileStatus = (0 until 20).map { size =>
new LocatedFileStatus(new FileStatus(size.toLong, false, 3, 128, 1234L, s3PathBAsS3a), Array())
}.toList
when(mockedS3aFilesystem.listLocatedStatus(any())).thenReturn(listToRemoteIterator(locatedFileStatus))
val returnedIterator = s3Fs.listLocatedStatus(s3Path)
val returnedList = remoteIteratorToList(returnedIterator)
verify(mockedS3aFilesystem).listLocatedStatus(s3PathAsS3a)
assert(returnedList.map(_.getPath).distinct === List(s3PathB))
assert(returnedList === locatedFileStatus.map{ e => e.setPath(s3PathB); e})
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment