Skip to content

Instantly share code, notes, and snippets.

Created January 1, 2013 20:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/4429628 to your computer and use it in GitHub Desktop.
Save anonymous/4429628 to your computer and use it in GitHub Desktop.
package org.apache.giraph;
import org.apache.giraph.conf.GiraphClasses;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.graph.EdgeListVertex;
import org.apache.giraph.graph.GiraphJob;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexReader;
import org.apache.giraph.graph.partition.HashMasterPartitioner;
import org.apache.giraph.io.GeneratedVertexInputFormat;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.junit.Test;
import java.io.IOException;
import static org.junit.Assert.assertTrue;
public class TestOutOfCoreConcurrency extends BspCase {
public TestOutOfCoreConcurrency() {
super(TestOutOfCoreConcurrency.class.getName());
}
public static class TestVertex extends EdgeListVertex<IntWritable,
IntWritable,
IntWritable, IntWritable> {
private static int NUM_SUPERSTEPS = 10;
@Override
public void compute(Iterable<IntWritable> messages) throws IOException {
if (getSuperstep() > 0) {
setValue(new IntWritable(getValue().get() + 1));
if (getValue().get() != getSuperstep()) {
throw new IOException("Wrong vertex value!");
}
if (getSuperstep() == NUM_SUPERSTEPS) {
voteToHalt();
}
}
}
}
public static class TestVertexInputFormat
extends GeneratedVertexInputFormat<IntWritable, IntWritable, IntWritable,
IntWritable> {
@Override
public VertexReader<IntWritable, IntWritable, IntWritable, IntWritable>
createVertexReader(InputSplit split, TaskAttemptContext context)
throws IOException {
return new VertexReader<IntWritable, IntWritable, IntWritable,
IntWritable>() {
private int verticesToRead = 100;
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext context)
throws IOException, InterruptedException { }
@Override
public boolean nextVertex() throws IOException, InterruptedException {
return (verticesToRead > 0);
}
@Override
public Vertex<IntWritable, IntWritable, IntWritable, IntWritable>
getCurrentVertex() throws IOException, InterruptedException {
TestVertex vertex = new TestVertex();
vertex.initialize(new IntWritable(verticesToRead),
new IntWritable(0));
--verticesToRead;
return vertex;
}
@Override
public void close() throws IOException { }
@Override
public float getProgress() throws IOException, InterruptedException {
return 100.0f - verticesToRead;
}
};
}
}
@Test
public void testInMemorySingleThread() throws IOException,
InterruptedException, ClassNotFoundException {
testOutOfCoreConcurrency(50, 50, 1);
}
@Test
public void testInMemoryMultipleThreads() throws IOException,
InterruptedException, ClassNotFoundException {
testOutOfCoreConcurrency(50, 50, 10);
}
@Test
public void testOutOfCoreSingleThread() throws IOException,
InterruptedException, ClassNotFoundException {
testOutOfCoreConcurrency(50, 1, 1);
}
@Test
public void testOutOfCoreMultipleThreads() throws IOException,
InterruptedException, ClassNotFoundException {
testOutOfCoreConcurrency(50, 1, 10);
}
public void testOutOfCoreConcurrency(int numPartitions,
int maxPartitionsInMemory,
int numComputeThreads) throws
IOException,
InterruptedException, ClassNotFoundException {
GiraphClasses<IntWritable, IntWritable, IntWritable,
IntWritable> classes =
new GiraphClasses<IntWritable, IntWritable, IntWritable, IntWritable>();
classes.setVertexClass(TestVertex.class);
classes.setVertexInputFormatClass(TestVertexInputFormat.class);
GiraphJob job = prepareJob(getCallingMethodName(), classes);
GiraphConfiguration conf = job.getConfiguration();
conf.setInt(HashMasterPartitioner.USER_PARTITION_COUNT, numPartitions);
conf.setNumComputeThreads(numComputeThreads);
if (maxPartitionsInMemory < numPartitions) {
conf.setBoolean(GiraphConstants.USE_OUT_OF_CORE_GRAPH, true);
conf.setInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY,
maxPartitionsInMemory);
}
assertTrue(job.run(true));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment