Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example to demonstrate that using cache() on SparkPipeline PTable causes avro records to be reused
0,4
1,4
2,4
3,4
4,4
0,0
1,1
2,2
3,3
4,4
package com.test;
import java.util.Collection;
import java.util.LinkedList;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.io.From;
import org.apache.crunch.io.To;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.util.CrunchTool;
import org.apache.hadoop.util.ToolRunner;
import org.joda.time.DateTime;
public class SparkCacheExample extends CrunchTool {
public SparkCacheExample() {
super(false);
}
@Override
public int run(String[] args) throws Exception {
Pipeline pipeline = new SparkPipeline(args[0], "SparkCacheExample", getClass(), getConf());
Schema schema = new Schema.Parser().parse(
"{\r\n \"type\": \"record\",\r\n \"name\": \"User\",\r\n \"namespace\": \"com.test\",\r\n \"doc\": \"A user record\",\r\n \"fields\": [\r\n {\r\n \"name\": \"userId\",\r\n \"type\": \"string\",\r\n \"mapping\": { \"type\": \"key\", \"value\": \"0\" }\r\n },\r\n {\r\n \"name\": \"creationDate\",\r\n \"type\": \"long\",\r\n \"mapping\": { \"type\": \"column\", \"value\": \"meta:creationDate\" }\r\n }\r\n ]\r\n}");
Collection<Record> inputs = new LinkedList<Record>();
for (int i=0; i<5; i++) {
Record r = new GenericData.Record(schema);
r.put("userId", Integer.toString(i));
r.put("creationDate", new DateTime().getMillis());
inputs.add(r);
}
pipeline.write(pipeline.create(inputs, Avros.generics(schema)), To.avroFile(args[1]));
pipeline.run();
// perform cleanup so that crunch does not reuse existing Target
pipeline.cleanup(true);
PCollection<Record> pCollectionFromHDFS = pipeline.read(From.avroFile(args[1], Avros.generics(schema)));
PTable<String, Record> cachedPTable = pCollectionFromHDFS.parallelDo(new DoFn<Record, Pair<String, Record>>() {
@Override
public void process(Record input,
Emitter<Pair<String, Record>> emitter) {
String userId = input.get("userId").toString();
emitter.emit(Pair.of(userId, input));
}
}, Avros.tableOf(Avros.strings(), Avros.generics(schema))).cache();
Collection<String> keyValueIds = cachedPTable.parallelDo(new DoFn<Pair<String, Record>, String>() {
@Override
public void process(Pair<String, Record> input,
Emitter<String> emitter) {
emitter.emit(input.first() + "," + input.second().get("userId").toString());
}
}, Avros.strings()).asCollection().getValue();
pipeline.done();
for (String s: keyValueIds)
System.out.println(s);
return 0;
}
public static void main(String[] args) throws Exception {
ToolRunner.run(new SourcePipeline(), args);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.