Skip to content

Instantly share code, notes, and snippets.

@gustavoanatoly
Last active June 28, 2017 19:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gustavoanatoly/b7a29062a45201168362401346cede61 to your computer and use it in GitHub Desktop.
Save gustavoanatoly/b7a29062a45201168362401346cede61 to your computer and use it in GitHub Desktop.
Reproducing orphan blobstore
@Test
public void testOrphanBlobStore() throws Exception {
int instances = 2;
long cleanupInterval = 1000l;
BlobClient blobClient = null;
BlobStoreService blobStoreService = null;
BlobServer[] blobServers = new BlobServer[instances];
BlobLibraryCacheManager[] managers = new BlobLibraryCacheManager[instances];
InetSocketAddress[] addresses = new InetSocketAddress[instances];
Random random = new Random();
try {
final Configuration config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
config.setString(CoreOptions.STATE_BACKEND, "FILESYSTEM");
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH,
temporaryFolder.getRoot().getPath() + "/tmp-data");
blobStoreService = BlobUtils.createBlobStoreFromConfig(config);
for (int i = 0; i < instances; i++) {
blobServers[i] = new BlobServer(config, blobStoreService);
addresses[i] = new InetSocketAddress("0.0.0.0", blobServers[i].getPort());
managers[i] = new BlobLibraryCacheManager(blobServers[i], cleanupInterval);
}
blobClient = new BlobClient(addresses[0], config);
byte[] blob_1 = new byte[1024];
byte[] blob_2 = new byte[1024];
random.nextBytes(blob_1);
random.nextBytes(blob_2);
BlobKey key_1 = blobClient.put(blob_1);
BlobKey key_2 = blobClient.put(blob_2);
List<BlobKey> orphans = new ArrayList<>();
orphans.add(key_1);
orphans.add(key_2);
JobID jobOrphan = new JobID();
managers[0].registerJob(jobOrphan, orphans, Collections.<URL>emptyList());
assertTrue(managers[0].getFile(key_1).exists());
assertTrue(managers[0].getFile(key_2).exists());
assertTrue(managers[1].getFile(key_1).exists());
assertTrue(managers[1].getFile(key_2).exists());
// server restarted and manager recovered to another server
blobServers[0] = new BlobServer(config, blobStoreService);
managers[0] = new BlobLibraryCacheManager(blobServers[1], cleanupInterval);
byte[] blob_3 = new byte[1024];
random.nextBytes(blob_3);
BlobKey key_3 = blobClient.put(blob_3);
List<BlobKey> blobs = new ArrayList<>();
blobs.add(key_3);
JobID job = new JobID();
managers[0].registerJob(job, blobs, Collections.<URL>emptyList());
/**
* BlobLibraryCacheManager#getNumberOfReferencesByKey was implemeted
* to check if there are reference count equals to 0 and there are
* blobKey.
*/
assertTrue(managers[0].getFile(key_1).exists());
assertTrue(managers[0].getNumberOfReferencesByKey(key_1) == 0);
assertTrue(managers[0].getFile(key_2).exists());
assertTrue(managers[0].getNumberOfReferencesByKey(key_2) == 0);
assertTrue(managers[0].getFile(key_3).exists());
assertTrue(managers[0].getNumberOfReferencesByKey(key_3) == 1);
managers[0].unregisterJob(jobOrphan);
managers[0].unregisterJob(job);
// Cleaning up
Thread.sleep(cleanupInterval + 1);
// Checking orphan references
//----------------------------------------------------------------------
assertTrue(managers[0].getNumberOfReferencesByKey(key_1) == 0);
assertTrue(managers[0].getFile(key_1).exists());
assertTrue(managers[0].getNumberOfReferencesByKey(key_2) == 0);
assertTrue(managers[0].getFile(key_2).exists());
//----------------------------------------------------------------------
int fileNotFound = 0;
try {
assertTrue(managers[0].getNumberOfReferencesByKey(key_3) == 0);
managers[0].getFile(key_3).exists();
} catch (IOException ex) {
fileNotFound++;
}
// File not found, was removed correctly by cleanup
assertEquals(1, fileNotFound);
// Checking files after cleanup. This files should be re-register
assertTrue(managers[0].getFile(key_1).exists());
assertTrue(managers[0].getFile(key_2).exists());
} finally {
if (blobClient != null && !blobClient.isClosed()) {
blobClient.close();
}
for (BlobServer bs : blobServers) {
if (bs != null && !bs.isShutdown()) {
bs.close();
}
}
for (BlobLibraryCacheManager blcm : managers) {
if (blcm != null) {
blcm.shutdown();
}
}
if (blobStoreService != null) {
blobStoreService.closeAndCleanupAllData();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment