Skip to content

Instantly share code, notes, and snippets.

@keeganwitt
Created April 7, 2014 20:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save keeganwitt/10044534 to your computer and use it in GitHub Desktop.
Save keeganwitt/10044534 to your computer and use it in GitHub Desktop.
A Groovy script to download HDFS files in parallel
// Usage: groovy hdfsDownloader.groovy [prod|qa|<dev username>] <file> (<output directory>)
// <file> can be an individual file or a directory of part files.
// If <output directory> is not specified, the current working directory is used
import groovy.json.JsonSlurper
import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
import java.util.concurrent.TimeUnit
class Main {
static final int THREAD_POOL_SIZE = 20
static final String DEV_BASE_URL = "http://webhdfs.mycompany.com:14000/webhdfs/v1"
static final String QA_BASE_URL = "http://webhdfs.mycompany.com:14000/webhdfs/v1"
static final String PROD_BASE_URL = "http://webhdfs.mycompany.com:14000/webhdfs/v1"
static final String QA_USERNAME = "qa"
static final String PROD_USERNAME = "prod"
static void main(String[] args) {
String file = args[1]
File outputDir
if (args.size() == 3) {
outputDir = new File(args[2])
} else {
outputDir = new File("").getCanonicalFile()
}
String username
String urlPrefix
if (args[0].equals("prod")) {
username = PROD_USERNAME
urlPrefix = urlPrefix = file.startsWith("/") ? PROD_BASE_URL : "${PROD_BASE_URL}/user/${username}"
} else if (args[0].equals("qa")) {
username = QA_USERNAME
urlPrefix = file.startsWith("/") ? QA_BASE_URL : "${QA_BASE_URL}/user/${username}"
} else {
username = args[0]
urlPrefix = file.startsWith("/") ? DEV_BASE_URL : "${DEV_BASE_URL}/user/${username}"
}
def json = new JsonSlurper().parseText("${urlPrefix}/${file}?op=LISTSTATUS&user.name=${username}".toURL().getText())
def fileStatuses = json.FileStatuses.FileStatus
ExecutorService executor
if (fileStatuses.size() > 1) {
executor = Executors.newFixedThreadPool(THREAD_POOL_SIZE)
File dir = new File(outputDir, new File(file).getName())
dir.mkdir()
fileStatuses.each { status ->
if (status.type?.equals("FILE") && status.pathSuffix?.startsWith("part-")) {
executor.execute(new FileWriter(new File(dir, status.pathSuffix), "${urlPrefix}/${file}/${status.pathSuffix}?op=OPEN&user.name=${username}".toURL()))
}
}
} else {
executor = Executors.newSingleThreadExecutor()
executor.execute(new FileWriter(new File(outputDir, new File(file).getName()), "${urlPrefix}/${file}?op=OPEN&user.name=${username}".toURL()))
}
executor.shutdown()
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)
}
}
class FileWriter implements Runnable {
static final int CHUNK_SIZE = 1024
File file
URL url
FileWriter(File file, URL url) {
this.file = file
this.url = url
}
@Override
void run() {
println "Writing file [${file.getCanonicalPath()}]..."
InputStream is = url.openStream()
byte[] chunk = new byte[CHUNK_SIZE]
BufferedOutputStream outputStream = new BufferedOutputStream(new FileOutputStream(file))
int n
while ((n = is.read(chunk)) > 0) {
outputStream.write(chunk, 0, n)
}
is.close()
outputStream.flush()
outputStream.close()
println "Finished writing file [${file.getCanonicalPath()}]."
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment