Created
January 1, 2013 20:02
-
-
Save anonymous/4429628 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.giraph; | |
import org.apache.giraph.conf.GiraphClasses; | |
import org.apache.giraph.conf.GiraphConfiguration; | |
import org.apache.giraph.conf.GiraphConstants; | |
import org.apache.giraph.graph.EdgeListVertex; | |
import org.apache.giraph.graph.GiraphJob; | |
import org.apache.giraph.graph.Vertex; | |
import org.apache.giraph.graph.VertexReader; | |
import org.apache.giraph.graph.partition.HashMasterPartitioner; | |
import org.apache.giraph.io.GeneratedVertexInputFormat; | |
import org.apache.hadoop.io.IntWritable; | |
import org.apache.hadoop.mapreduce.InputSplit; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.junit.Test; | |
import java.io.IOException; | |
import static org.junit.Assert.assertTrue; | |
public class TestOutOfCoreConcurrency extends BspCase { | |
public TestOutOfCoreConcurrency() { | |
super(TestOutOfCoreConcurrency.class.getName()); | |
} | |
public static class TestVertex extends EdgeListVertex<IntWritable, | |
IntWritable, | |
IntWritable, IntWritable> { | |
private static int NUM_SUPERSTEPS = 10; | |
@Override | |
public void compute(Iterable<IntWritable> messages) throws IOException { | |
if (getSuperstep() > 0) { | |
setValue(new IntWritable(getValue().get() + 1)); | |
if (getValue().get() != getSuperstep()) { | |
throw new IOException("Wrong vertex value!"); | |
} | |
if (getSuperstep() == NUM_SUPERSTEPS) { | |
voteToHalt(); | |
} | |
} | |
} | |
} | |
public static class TestVertexInputFormat | |
extends GeneratedVertexInputFormat<IntWritable, IntWritable, IntWritable, | |
IntWritable> { | |
@Override | |
public VertexReader<IntWritable, IntWritable, IntWritable, IntWritable> | |
createVertexReader(InputSplit split, TaskAttemptContext context) | |
throws IOException { | |
return new VertexReader<IntWritable, IntWritable, IntWritable, | |
IntWritable>() { | |
private int verticesToRead = 100; | |
@Override | |
public void initialize(InputSplit inputSplit, | |
TaskAttemptContext context) | |
throws IOException, InterruptedException { } | |
@Override | |
public boolean nextVertex() throws IOException, InterruptedException { | |
return (verticesToRead > 0); | |
} | |
@Override | |
public Vertex<IntWritable, IntWritable, IntWritable, IntWritable> | |
getCurrentVertex() throws IOException, InterruptedException { | |
TestVertex vertex = new TestVertex(); | |
vertex.initialize(new IntWritable(verticesToRead), | |
new IntWritable(0)); | |
--verticesToRead; | |
return vertex; | |
} | |
@Override | |
public void close() throws IOException { } | |
@Override | |
public float getProgress() throws IOException, InterruptedException { | |
return 100.0f - verticesToRead; | |
} | |
}; | |
} | |
} | |
@Test | |
public void testInMemorySingleThread() throws IOException, | |
InterruptedException, ClassNotFoundException { | |
testOutOfCoreConcurrency(50, 50, 1); | |
} | |
@Test | |
public void testInMemoryMultipleThreads() throws IOException, | |
InterruptedException, ClassNotFoundException { | |
testOutOfCoreConcurrency(50, 50, 10); | |
} | |
@Test | |
public void testOutOfCoreSingleThread() throws IOException, | |
InterruptedException, ClassNotFoundException { | |
testOutOfCoreConcurrency(50, 1, 1); | |
} | |
@Test | |
public void testOutOfCoreMultipleThreads() throws IOException, | |
InterruptedException, ClassNotFoundException { | |
testOutOfCoreConcurrency(50, 1, 10); | |
} | |
public void testOutOfCoreConcurrency(int numPartitions, | |
int maxPartitionsInMemory, | |
int numComputeThreads) throws | |
IOException, | |
InterruptedException, ClassNotFoundException { | |
GiraphClasses<IntWritable, IntWritable, IntWritable, | |
IntWritable> classes = | |
new GiraphClasses<IntWritable, IntWritable, IntWritable, IntWritable>(); | |
classes.setVertexClass(TestVertex.class); | |
classes.setVertexInputFormatClass(TestVertexInputFormat.class); | |
GiraphJob job = prepareJob(getCallingMethodName(), classes); | |
GiraphConfiguration conf = job.getConfiguration(); | |
conf.setInt(HashMasterPartitioner.USER_PARTITION_COUNT, numPartitions); | |
conf.setNumComputeThreads(numComputeThreads); | |
if (maxPartitionsInMemory < numPartitions) { | |
conf.setBoolean(GiraphConstants.USE_OUT_OF_CORE_GRAPH, true); | |
conf.setInt(GiraphConstants.MAX_PARTITIONS_IN_MEMORY, | |
maxPartitionsInMemory); | |
} | |
assertTrue(job.run(true)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment