Skip to content

Instantly share code, notes, and snippets.

@rsumbaly
Created May 11, 2011 00:41
Show Gist options
  • Save rsumbaly/965695 to your computer and use it in GitHub Desktop.
Save rsumbaly/965695 to your computer and use it in GitHub Desktop.
FileStreamTest
package voldemort.performance;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.Map.Entry;
import junit.framework.TestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import voldemort.ServerTestUtils;
import voldemort.TestUtils;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.cluster.Cluster;
import voldemort.cluster.Node;
import voldemort.server.VoldemortServer;
import voldemort.store.Store;
import voldemort.store.StoreDefinition;
import voldemort.store.readonly.ReadOnlyStorageEngine;
import voldemort.store.readonly.ReadOnlyStorageFormat;
import voldemort.store.readonly.ReadOnlyStorageMetadata;
import voldemort.store.socket.SocketStoreFactory;
import voldemort.store.socket.clientrequest.ClientRequestExecutorPool;
import voldemort.utils.ByteArray;
import voldemort.utils.Pair;
import voldemort.utils.RebalanceUtils;
import voldemort.utils.Utils;
import voldemort.xml.StoreDefinitionsMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
*/
@RunWith(Parameterized.class)
public class FileStreamTest extends TestCase {
private static String storesXmlfile = "test/common/voldemort/config/stores.xml";
private SocketStoreFactory socketStoreFactory = new ClientRequestExecutorPool(2,
10000,
100000,
32 * 1024);
private List<StoreDefinition> storeDefs;
private VoldemortServer[] servers;
private Cluster cluster;
private AdminClient adminClient;
private final boolean useNio;
public FileStreamTest(boolean useNio) {
this.useNio = useNio;
}
@Parameters
public static Collection<Object[]> configs() {
return Arrays.asList(new Object[][] { { true } });
}
@Override
@Before
public void setUp() throws IOException {
cluster = ServerTestUtils.getLocalCluster(2, new int[][] { { 0 }, { 1 } });
servers = new VoldemortServer[2];
storeDefs = new StoreDefinitionsMapper().readStoreList(new File(storesXmlfile));
servers[0] = ServerTestUtils.startVoldemortServer(socketStoreFactory,
ServerTestUtils.createServerConfig(useNio,
0,
TestUtils.createTempDir()
.getAbsolutePath(),
null,
storesXmlfile,
new Properties()),
cluster);
servers[1] = ServerTestUtils.startVoldemortServer(socketStoreFactory,
ServerTestUtils.createServerConfig(useNio,
1,
TestUtils.createTempDir()
.getAbsolutePath(),
null,
storesXmlfile,
new Properties()),
cluster);
adminClient = ServerTestUtils.getAdminClient(cluster);
}
@Override
@After
public void tearDown() throws IOException, InterruptedException {
adminClient.stop();
for(VoldemortServer server: servers) {
ServerTestUtils.stopVoldemortServer(server);
}
socketStoreFactory.close();
}
private VoldemortServer getVoldemortServer(int nodeId) {
return servers[nodeId];
}
private AdminClient getAdminClient() {
return adminClient;
}
private Store<ByteArray, byte[], byte[]> getStore(int nodeID, String storeName) {
Store<ByteArray, byte[], byte[]> store = getVoldemortServer(nodeID).getStoreRepository()
.getStorageEngine(storeName);
assertNotSame("Store '" + storeName + "' should not be null", null, store);
return store;
}
@Test
public void testFetchPartitionFiles() throws IOException {
generateAndFetchFiles(1, 1, 1200, 12000000);
}
private void generateROFiles(int numChunks,
long indexSize,
long dataSize,
HashMap<Integer, List<Integer>> buckets,
File versionDir) throws IOException {
ReadOnlyStorageMetadata metadata = new ReadOnlyStorageMetadata();
metadata.add(ReadOnlyStorageMetadata.FORMAT, ReadOnlyStorageFormat.READONLY_V2.getCode());
File metadataFile = new File(versionDir, ".metadata");
BufferedWriter writer = new BufferedWriter(new FileWriter(metadataFile));
writer.write(metadata.toJsonString());
writer.close();
for(Entry<Integer, List<Integer>> entry: buckets.entrySet()) {
int replicaType = entry.getKey();
for(int partitionId: entry.getValue()) {
for(int chunkId = 0; chunkId < numChunks; chunkId++) {
System.out.println("Writing replica " + replicaType + " - partition "
+ partitionId + " - chunk " + chunkId);
File index = new File(versionDir, Integer.toString(partitionId) + "_"
+ Integer.toString(replicaType) + "_"
+ Integer.toString(chunkId) + ".index");
File data = new File(versionDir, Integer.toString(partitionId) + "_"
+ Integer.toString(replicaType) + "_"
+ Integer.toString(chunkId) + ".data");
// write some random crap for index and data
FileOutputStream dataOs = new FileOutputStream(data);
for(int i = 0; i < dataSize; i++)
dataOs.write(i);
dataOs.close();
FileOutputStream indexOs = new FileOutputStream(index);
for(int i = 0; i < indexSize; i++)
indexOs.write(i);
indexOs.close();
}
}
}
}
@SuppressWarnings("unchecked")
private void generateAndFetchFiles(int numChunks, long versionId, long indexSize, long dataSize)
throws IOException {
Map<Integer, Set<Pair<Integer, Integer>>> buckets = RebalanceUtils.getNodeIdToAllPartitions(cluster,
Lists.newArrayList(RebalanceUtils.getStoreDefinitionWithName(storeDefs,
"test-readonly-fetchfiles")),
true);
for(Node node: cluster.getNodes()) {
ReadOnlyStorageEngine store = (ReadOnlyStorageEngine) getStore(node.getId(),
"test-readonly-fetchfiles");
// Create list of buckets ( replica to partition )
Set<Pair<Integer, Integer>> nodeBucketsSet = buckets.get(node.getId());
HashMap<Integer, List<Integer>> nodeBuckets = RebalanceUtils.flattenPartitionTuples(nodeBucketsSet);
// Split the buckets into primary and replica buckets
HashMap<Integer, List<Integer>> primaryNodeBuckets = Maps.newHashMap();
primaryNodeBuckets.put(0, nodeBuckets.get(0));
int primaryPartitions = nodeBuckets.get(0).size();
HashMap<Integer, List<Integer>> replicaNodeBuckets = Maps.newHashMap(nodeBuckets);
replicaNodeBuckets.remove(0);
int replicaPartitions = 0;
for(List<Integer> partitions: replicaNodeBuckets.values()) {
replicaPartitions += partitions.size();
}
// Generate data...
File newVersionDir = new File(store.getStoreDirPath(), "version-"
+ Long.toString(versionId));
Utils.mkdirs(newVersionDir);
generateROFiles(numChunks, indexSize, dataSize, nodeBuckets, newVersionDir);
// Swap it...
store.swapFiles(newVersionDir.getAbsolutePath());
// Check if everything got mmap-ed correctly...
HashMap<Object, Integer> chunkIdToNumChunks = store.getChunkedFileSet()
.getChunkIdToNumChunks();
for(Object bucket: chunkIdToNumChunks.keySet()) {
Pair<Integer, Integer> partitionToReplicaBucket = (Pair<Integer, Integer>) bucket;
Pair<Integer, Integer> replicaToPartitionBucket = Pair.create(partitionToReplicaBucket.getSecond(),
partitionToReplicaBucket.getFirst());
assertTrue(nodeBucketsSet.contains(replicaToPartitionBucket));
}
// Test 3) Fetch all the partitions...
File tempDir = TestUtils.createTempDir();
getAdminClient().fetchPartitionFiles(node.getId(),
"test-readonly-fetchfiles",
nodeBuckets,
tempDir.getAbsolutePath(),
null);
// Check it...
assertEquals(tempDir.list().length, 2 * (primaryPartitions + replicaPartitions)
* numChunks);
for(Entry<Integer, List<Integer>> entry: nodeBuckets.entrySet()) {
int replicaType = entry.getKey();
for(int partitionId: entry.getValue()) {
for(int chunkId = 0; chunkId < numChunks; chunkId++) {
File indexFile = new File(tempDir, Integer.toString(partitionId) + "_"
+ Integer.toString(replicaType) + "_"
+ Integer.toString(chunkId) + ".index");
File dataFile = new File(tempDir, Integer.toString(partitionId) + "_"
+ Integer.toString(replicaType) + "_"
+ Integer.toString(chunkId) + ".data");
assertTrue(indexFile.exists());
assertTrue(dataFile.exists());
assertEquals(indexFile.length(), indexSize);
assertEquals(dataFile.length(), dataSize);
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment