Skip to content

Instantly share code, notes, and snippets.

@cykl
Last active January 6, 2016 14:10
Show Gist options
  • Save cykl/7f71c1a3dff3f881f3ba to your computer and use it in GitHub Desktop.
Save cykl/7f71c1a3dff3f881f3ba to your computer and use it in GitHub Desktop.
[Crunch] POC: Avoid the MateralizableIterable dance by storing all the PipelineResults in the pipeline instance.
package fr.mediametrie.internet.weball.crunch;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.impl.mr.MRPipelineExecution;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.MoreExecutors;
/**
* A {@link MRPipeline} remembering all the {@link PipelineResult}s it has created.
*
* <p>
* Crunch's API makes it difficult to get access to the Hadoop counters when things like {@literal materialize()} are
* used:
* <ul>
* <li>If we stick to the public API, it is not possible to get access to the
* PipelineResult possibly created by a call to materialize. One has to cast
* the iterable to MaterializeIterable</li>
* <li>Code dealing with the iterable most likely do not care about counters
* at all, but must extract the PipelineResult and pass it to someone else to
* not loose it.</li>
* </ul>
*/
public class HyperthymesticMRPipeline extends MRPipeline {
private final ConcurrentLinkedQueue<PipelineResult> allPipelineResults = new ConcurrentLinkedQueue<>();
public HyperthymesticMRPipeline(Class<?> jarClass) {
super(jarClass);
}
public HyperthymesticMRPipeline(Class<?> jarClass, String name) {
super(jarClass, name);
}
public HyperthymesticMRPipeline(Class<?> jarClass, Configuration conf) {
super(jarClass, conf);
}
public HyperthymesticMRPipeline(Class<?> jarClass, String name, Configuration conf) {
super(jarClass, name, conf);
}
/**
* Gets all the {@link PipelineResult} created by the pipeline since its creation
*/
public List<PipelineResult> getAllPipelineResults() {
return ImmutableList.copyOf(allPipelineResults);
}
@Override
public MRPipelineExecution runAsync() {
MRPipelineExecution mrPipelineExecution = super.runAsync();
mrPipelineExecution.addListener(() -> {
PipelineResult pipelineResult = mrPipelineExecution.getResult();
allPipelineResults.offer(pipelineResult);
}, MoreExecutors.sameThreadExecutor());
return mrPipelineExecution;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment