Skip to content

Instantly share code, notes, and snippets.

@btiernay
Last active May 17, 2017 09:13
Show Gist options
  • Save btiernay/1ad5e3dea08904fe07d9 to your computer and use it in GitHub Desktop.
Save btiernay/1ad5e3dea08904fe07d9 to your computer and use it in GitHub Desktop.
Example of extracting information from HDFS paths in a Spark transformation
// Input
val hadoopRDD = javaTextFile(context, projectPaths);
// Transform
val transformed = hadoopRDD.mapPartitionsWithInputSplit(new PreProcessLine(), false);
// ...
private static JavaHadoopRDD<LongWritable, Text> javaTextFile(JavaSparkContext context, String paths) {
return (JavaHadoopRDD<LongWritable, Text>) context.hadoopFile(paths, TextInputFormat.class, LongWritable.class,
Text.class);
}
package org.icgc.dcc.etl.staging.function;
import static com.google.common.base.Stopwatch.createStarted;
import static com.google.common.collect.Iterables.toArray;
import static org.icgc.dcc.common.core.util.FormatUtils.formatCount;
import static org.icgc.dcc.common.core.util.FormatUtils.formatPercent;
import static org.icgc.dcc.common.core.util.Splitters.TAB;
import java.io.Serializable;
import java.util.Iterator;
import lombok.val;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;
@Slf4j
public class PreProcessLine implements
Function2<InputSplit, Iterator<Tuple2<LongWritable, Text>>, Iterator<String[]>>, Serializable {
/**
* Constants.
*/
private static final int LINE_STATUS_COUNT = 10 * 1000 * 1000;
@Override
public Iterator<String[]> call(final InputSplit split, final Iterator<Tuple2<LongWritable, Text>> it)
throws Exception {
val watch = createStarted();
val fileSplit = (FileSplit) split;
val length = fileSplit.getLength();
val projectName = getProjectName(fileSplit);
// Lazy iterator
return new Iterator<String[]>() {
private long lineCount = 0;
private Tuple2<LongWritable, Text> record;
@Override
public boolean hasNext() {
if (!it.hasNext()) {
return false;
}
// Peek
record = it.next();
lineCount++;
if (lineCount % LINE_STATUS_COUNT == 0) {
val offset = getOffset(record);
val percent = offset * 1.0 / length;
log.info("{}: Processed {} lines ({} %) in {}",
fileSplit, formatCount(lineCount), formatPercent(percent), watch);
}
// Skip headers
if (isHeader(record)) {
return hasNext();
}
return true;
}
@Override
public String[] next() {
val line = getLine(record);
val prependedLine = projectName + "\t" + line;
String[] values = toArray(TAB.split(prependedLine), String.class);
return values;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Cannot remove a transformed iterator");
}
};
}
private static boolean isHeader(Tuple2<LongWritable, Text> record) {
return getOffset(record) == 0;
}
private static long getOffset(Tuple2<LongWritable, Text> record) {
return record._1.get();
}
private static String getLine(Tuple2<LongWritable, Text> record) {
return record._2.toString();
}
private static String getProjectName(FileSplit fileSplit) {
return fileSplit.getPath().getParent().getName();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment