Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example to demonstrate SparkPipeline fails when Aggregators is used on Avro inputs
15/10/05 16:02:33 WARN TaskSetManager: Lost task 3.0 in stage 0.0 (TID 0, 123.domain.xyz): java.lang.NullPointerException
at org.apache.crunch.impl.mr.run.UniformHashPartitioner.getPartition(UniformHashPartitioner.java:32)
at org.apache.crunch.impl.spark.fn.PartitionedMapOutputFunction.call(PartitionedMapOutputFunction.java:62)
at org.apache.crunch.impl.spark.fn.PartitionedMapOutputFunction.call(PartitionedMapOutputFunction.java:35)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1002)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
package com.test;
import java.util.Collection;
import org.apache.crunch.Aggregator;
import org.apache.crunch.MapFn;
import org.apache.crunch.Pipeline;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.util.CrunchTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
public class SparkAggregators extends CrunchTool {
private static final long serialVersionUID = 2412214301679720140L;
public SparkAggregators() {
// skip MRPipeline creation
super(true);
}
@Override
public int run(String[] args) throws Exception {
String sparkConnect = args[0];
String input = args[1];
Pipeline pipeline = null;
Configuration conf = getConf();
if (conf.getBoolean("isSpark", true))
pipeline = new SparkPipeline(sparkConnect, "SparkAggregators", getClass(), conf);
else
pipeline = new MRPipeline(getClass(), "SparkAggregators", conf);
Aggregator<Person> first_20 = Aggregators.FIRST_N(20);
Collection<Person> first20Persons = pipeline.read(From.avroFile(input, Person.class)).aggregate(first_20).asCollection().getValue();
pipeline.done();
for (Person p : first20Persons) {
System.out.println(p.getId());
}
return 0;
}
public static void main(String args[]) throws Exception {
ToolRunner.run(new SparkAggregators(), args);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.