Skip to content

Instantly share code, notes, and snippets.

@lintool
Created May 19, 2011 20:24
Show Gist options
  • Save lintool/981642 to your computer and use it in GitHub Desktop.
Save lintool/981642 to your computer and use it in GitHub Desktop.
Stream of columns from a directory of tab-delimited flat files in HDFS
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Iterator;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.io.CharStreams;
import com.google.common.io.InputSupplier;
// Problem: you have a bunch of flat tab-delimited files in HDFS you wanna read.
// Solution: as follows...
public class StreamOfColumns {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
final FileSystem fs = FileSystem.get(conf);
Path p = new Path("foo/");
Iterator<String[]> iter = Iterators.transform((Iterator<String>) IOUtils
.lineIterator(CharStreams.join(Iterables.transform(Arrays.asList(fs.listStatus(p)),
new Function<FileStatus, InputSupplier<InputStreamReader>>() {
@Override
public InputSupplier<InputStreamReader> apply(final FileStatus status) {
return new InputSupplier<InputStreamReader>() {
@Override
public InputStreamReader getInput() throws IOException {
return new InputStreamReader(fs.open(status.getPath()));
}
};
}
})).getInput()),
new Function<String, String[]>() {
@Override
public String[] apply(String s) {
return s.split("\\t");
}
});
while (iter.hasNext()) {
System.out.println(Joiner.on(",").join(iter.next()));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment