Created
October 12, 2016 08:08
-
-
Save amit-jain/f2f1a6eee389470fe2beddbf285c1870 to your computer and use it in GitHub Desktop.
OAK-4837 Tests
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.jackrabbit.oak.plugins.blob; | |
import java.io.ByteArrayInputStream; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.FileNotFoundException; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
import javax.annotation.Nonnull; | |
import javax.annotation.Nullable; | |
import com.google.common.cache.CacheLoader; | |
import com.google.common.collect.Lists; | |
import com.google.common.collect.Maps; | |
import com.google.common.io.Files; | |
import com.google.common.util.concurrent.AbstractListeningExecutorService; | |
import com.google.common.util.concurrent.FutureCallback; | |
import com.google.common.util.concurrent.Futures; | |
import com.google.common.util.concurrent.ListenableFuture; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.jackrabbit.core.data.AbstractDataRecord; | |
import org.apache.jackrabbit.core.data.DataIdentifier; | |
import org.apache.jackrabbit.core.data.DataRecord; | |
import org.apache.jackrabbit.core.data.DataStoreException; | |
import org.apache.jackrabbit.core.data.util.NamedThreadFactory; | |
import org.apache.jackrabbit.oak.spi.blob.SharedBackend; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* Abstract class for DataStore cache related tests. | |
*/ | |
public class AbstractDataStoreCacheTest { | |
static final Logger LOG = LoggerFactory.getLogger(AbstractDataStoreCacheTest.class); | |
static class TestStagingUploader implements StagingUploader { | |
private final File root; | |
public TestStagingUploader(File dir) { | |
this.root = new File(dir, "datastore"); | |
root.mkdirs(); | |
} | |
@Override public void write(String id, File f) throws DataStoreException { | |
try { | |
File move = getFile(id, root); | |
move.getParentFile().mkdirs(); | |
Files.copy(f, move); | |
LOG.info("In TestStagingUploader after write [{}]", move); | |
} catch (IOException e) { | |
throw new DataStoreException(e); | |
} | |
} | |
public File read(String id) { | |
return getFile(id, root); | |
} | |
} | |
static class TestCacheLoader<S, I> extends CacheLoader<String, FileInputStream> { | |
private final File root; | |
public TestCacheLoader(File dir) { | |
this.root = new File(dir, "datastore"); | |
root.mkdirs(); | |
} | |
public void write(String id, File f) throws DataStoreException { | |
try { | |
File move = getFile(id, root); | |
move.getParentFile().mkdirs(); | |
Files.copy(f, move); | |
LOG.info("In TestCacheLoader after write [{}], [{}]", id, move); | |
} catch (IOException e) { | |
throw new DataStoreException(e); | |
} | |
} | |
@Override public FileInputStream load(@Nonnull String key) throws Exception { | |
return FileUtils.openInputStream(getFile(key, root)); | |
} | |
} | |
static class TestPoolExecutor extends ThreadPoolExecutor { | |
private final CountDownLatch beforeLatch; | |
private final CountDownLatch afterLatch; | |
TestPoolExecutor(int threads, CountDownLatch beforeLatch, CountDownLatch afterLatch) { | |
super(threads, threads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), | |
new NamedThreadFactory("oak-async-thread")); | |
this.beforeLatch = beforeLatch; | |
this.afterLatch = afterLatch; | |
} | |
@Override public void beforeExecute(Thread t, Runnable command) { | |
try { | |
LOG.trace("Before execution....waiting for latch"); | |
beforeLatch.await(); | |
LOG.trace("Before execution....after acquiring latch"); | |
super.beforeExecute(t, command); | |
LOG.trace("Completed beforeExecute"); | |
} catch (Exception e) { | |
LOG.trace("Error in before execute", e); | |
} | |
} | |
@Override protected void afterExecute(Runnable r, Throwable t) { | |
try { | |
LOG.trace("After execution....counting down latch"); | |
afterLatch.countDown(); | |
LOG.trace("After execution....after counting down latch"); | |
super.afterExecute(r, t); | |
LOG.trace("Completed afterExecute"); | |
} catch (Exception e) { | |
LOG.trace("Error in after execute", e); | |
} | |
} | |
} | |
static class TestExecutor extends AbstractListeningExecutorService { | |
private final CountDownLatch afterLatch; | |
private final ExecutorService delegate; | |
final List<ListenableFuture<Integer>> futures; | |
public TestExecutor(int threads, CountDownLatch beforeLatch, CountDownLatch afterLatch, | |
CountDownLatch afterExecuteLatch) { | |
this.delegate = new TestPoolExecutor(threads, beforeLatch, afterExecuteLatch); | |
this.futures = Lists.newArrayList(); | |
this.afterLatch = afterLatch; | |
} | |
@Override @Nonnull public ListenableFuture<?> submit(@Nonnull Callable task) { | |
LOG.trace("Before submitting to super...."); | |
ListenableFuture<Integer> submit = super.submit(task); | |
LOG.trace("After submitting to super...."); | |
futures.add(submit); | |
Futures.addCallback(submit, new TestFutureCallback<Integer>(afterLatch)); | |
LOG.trace("Added callback"); | |
return submit; | |
} | |
@Override public void execute(@Nonnull Runnable command) { | |
delegate.execute(command); | |
} | |
@Override public void shutdown() { | |
delegate.shutdown(); | |
} | |
@Override @Nonnull public List<Runnable> shutdownNow() { | |
return delegate.shutdownNow(); | |
} | |
@Override public boolean isShutdown() { | |
return delegate.isShutdown(); | |
} | |
@Override public boolean isTerminated() { | |
return delegate.isTerminated(); | |
} | |
@Override public boolean awaitTermination(long timeout, @Nonnull TimeUnit unit) | |
throws InterruptedException { | |
return delegate.awaitTermination(timeout, unit); | |
} | |
static class TestFutureCallback<Integer> implements FutureCallback { | |
private final CountDownLatch latch; | |
public TestFutureCallback(CountDownLatch latch) { | |
this.latch = latch; | |
} | |
@Override public void onSuccess(@Nullable Object result) { | |
try { | |
LOG.trace("Waiting for latch in callback"); | |
latch.await(100, TimeUnit.MILLISECONDS); | |
LOG.trace("Acquired latch in onSuccess"); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
@Override public void onFailure(@Nonnull Throwable t) { | |
try { | |
LOG.trace("Waiting for latch onFailure in callback"); | |
latch.await(100, TimeUnit.MILLISECONDS); | |
LOG.trace("Acquired latch in onFailure"); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
// A mock Backend implementation that uses a Map to keep track of what | |
// records have been added and removed, for test purposes only. | |
static class TestMemoryBackend implements SharedBackend { | |
final Map<DataIdentifier, File> _backend = Maps.newHashMap(); | |
@Override public InputStream read(DataIdentifier identifier) throws DataStoreException { | |
try { | |
return new FileInputStream(_backend.get(identifier)); | |
} catch (FileNotFoundException e) { | |
throw new DataStoreException(e); | |
} | |
} | |
@Override public void write(DataIdentifier identifier, File file) | |
throws DataStoreException { | |
if (file != null && file.exists()) { | |
_backend.put(identifier, file); | |
} else { | |
throw new DataStoreException( | |
String.format("file %s of id %s", new Object[] {file, identifier.toString()})); | |
} | |
} | |
@Override public DataRecord getRecord(DataIdentifier id) throws DataStoreException { | |
if (_backend.containsKey(id)) { | |
final File f = _backend.get(id); | |
return new AbstractDataRecord(null, id) { | |
@Override public long getLength() throws DataStoreException { | |
return f.length(); | |
} | |
@Override public InputStream getStream() throws DataStoreException { | |
try { | |
return new FileInputStream(f); | |
} catch (FileNotFoundException e) { | |
e.printStackTrace(); | |
} | |
return null; | |
} | |
@Override public long getLastModified() { | |
return f.lastModified(); | |
} | |
}; | |
} | |
return null; | |
} | |
@Override public Iterator<DataIdentifier> getAllIdentifiers() throws DataStoreException { | |
return _backend.keySet().iterator(); | |
} | |
@Override public Iterator<DataRecord> getAllRecords() throws DataStoreException { | |
return null; | |
} | |
@Override public boolean exists(DataIdentifier identifier) throws DataStoreException { | |
return _backend.containsKey(identifier); | |
} | |
@Override public void close() throws DataStoreException { | |
} | |
@Override public void deleteRecord(DataIdentifier identifier) throws DataStoreException { | |
if (_backend.containsKey(identifier)) { | |
_backend.remove(identifier); | |
} | |
} | |
@Override public void addMetadataRecord(InputStream input, String name) | |
throws DataStoreException { | |
} | |
@Override public void addMetadataRecord(File input, String name) throws DataStoreException { | |
} | |
@Override public DataRecord getMetadataRecord(String name) { | |
return null; | |
} | |
@Override public List<DataRecord> getAllMetadataRecords(String prefix) { | |
return null; | |
} | |
@Override public boolean deleteMetadataRecord(String name) { | |
return false; | |
} | |
@Override public void deleteAllMetadataRecords(String prefix) { | |
} | |
@Override public void init() throws DataStoreException { | |
} | |
} | |
static InputStream randomStream(int seed, int size) { | |
Random r = new Random(seed); | |
byte[] data = new byte[size]; | |
r.nextBytes(data); | |
return new ByteArrayInputStream(data); | |
} | |
private static File getFile(String id, File root) { | |
File file = root; | |
file = new File(file, id.substring(0, 2)); | |
file = new File(file, id.substring(2, 4)); | |
return new File(file, id); | |
} | |
static File copyToFile(InputStream stream, File file) throws IOException { | |
FileUtils.copyInputStreamToFile(stream, file); | |
return file; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.jackrabbit.oak.plugins.blob; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
import java.security.DigestOutputStream; | |
import java.security.MessageDigest; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ScheduledFuture; | |
import java.util.concurrent.TimeUnit; | |
import com.google.common.collect.Iterators; | |
import com.google.common.io.Closer; | |
import com.google.common.io.Files; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.commons.io.output.NullOutputStream; | |
import org.apache.jackrabbit.core.data.DataIdentifier; | |
import org.apache.jackrabbit.core.data.DataRecord; | |
import org.apache.jackrabbit.core.data.DataStoreException; | |
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; | |
import org.apache.jackrabbit.oak.spi.blob.SharedBackend; | |
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider; | |
import org.apache.jackrabbit.oak.stats.StatisticsProvider; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.junit.rules.ExpectedException; | |
import org.junit.rules.TemporaryFolder; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import static org.apache.commons.codec.binary.Hex.encodeHexString; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.assertFalse; | |
import static org.junit.Assert.assertNotNull; | |
import static org.junit.Assert.assertNull; | |
import static org.junit.Assert.assertTrue; | |
/** | |
* Tests for {@link AbstractSharedCachingDataStore} | |
*/ | |
public class CachingDataStoreTest extends AbstractDataStoreCacheTest { | |
private static final Logger LOG = LoggerFactory.getLogger(CachingDataStoreTest.class); | |
private static final String ID_PREFIX = "12345"; | |
@Rule | |
public TemporaryFolder folder = new TemporaryFolder(new File("target")); | |
@Rule | |
public ExpectedException expectedEx = ExpectedException.none(); | |
private final Closer closer = Closer.create(); | |
private File root; | |
private CountDownLatch taskLatch; | |
private CountDownLatch callbackLatch; | |
private CountDownLatch afterExecuteLatch; | |
private ScheduledExecutorService scheduledExecutor; | |
private AbstractSharedCachingDataStore dataStore; | |
@Before | |
public void setup() throws Exception { | |
root = folder.newFolder(); | |
init(1, 64 * 1024 * 1024, 10); | |
} | |
private void init(int i, int cacheSize, int uploadSplit) throws Exception { | |
// create executor | |
taskLatch = new CountDownLatch(1); | |
callbackLatch = new CountDownLatch(1); | |
afterExecuteLatch = new CountDownLatch(i); | |
TestExecutor executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch); | |
// stats | |
ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor(); | |
closer.register(new ExecutorCloser(statsExecutor, 500, TimeUnit.MILLISECONDS)); | |
StatisticsProvider statsProvider = new DefaultStatisticsProvider(statsExecutor); | |
scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); | |
closer.register(new ExecutorCloser(scheduledExecutor, 500, TimeUnit.MILLISECONDS)); | |
dataStore = new AbstractSharedCachingDataStore() { | |
@Override protected SharedBackend createBackend() { | |
return new TestMemoryBackend(); | |
} | |
@Override public int getMinRecordLength() { | |
return 0; | |
} | |
}; | |
dataStore.setStatisticsProvider(statsProvider); | |
dataStore.setCacheSize(cacheSize); | |
dataStore.setStagingSplitPercentage(uploadSplit); | |
dataStore.listeningExecutor = executor; | |
dataStore.schedulerExecutor = scheduledExecutor; | |
dataStore.init(root.getAbsolutePath()); | |
} | |
/** | |
* Add, get, delete when zero cache size. | |
* @throws Exception | |
*/ | |
@Test | |
public void zeroCacheAddGetDelete() throws Exception { | |
dataStore.close(); | |
init(1, 0, 0); | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
String id = getIdForInputStream(f); | |
FileInputStream fin = new FileInputStream(f); | |
closer.register(fin); | |
DataRecord rec = dataStore.addRecord(fin); | |
assertEquals(id, rec.getIdentifier().toString()); | |
assertFile(rec.getStream(), f, folder); | |
rec = dataStore.getRecordIfStored(new DataIdentifier(id)); | |
assertEquals(id, rec.getIdentifier().toString()); | |
assertFile(rec.getStream(), f, folder); | |
assertEquals(1, Iterators.size(dataStore.getAllIdentifiers())); | |
dataStore.deleteRecord(new DataIdentifier(id)); | |
rec = dataStore.getRecordIfStored(new DataIdentifier(id)); | |
assertNull(rec); | |
} | |
/** | |
* Add, get, delete when staging cache is 0. | |
* @throws Exception | |
*/ | |
@Test | |
public void zeroStagingCacheAddGetDelete() throws Exception { | |
dataStore.close(); | |
init(1, 64 * 1024 * 1024, 0); | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
String id = getIdForInputStream(f); | |
FileInputStream fin = new FileInputStream(f); | |
closer.register(fin); | |
DataRecord rec = dataStore.addRecord(fin); | |
assertEquals(id, rec.getIdentifier().toString()); | |
assertFile(rec.getStream(), f, folder); | |
rec = dataStore.getRecordIfStored(new DataIdentifier(id)); | |
assertEquals(id, rec.getIdentifier().toString()); | |
assertFile(rec.getStream(), f, folder); | |
assertEquals(1, Iterators.size(dataStore.getAllIdentifiers())); | |
dataStore.deleteRecord(new DataIdentifier(id)); | |
rec = dataStore.getRecordIfStored(new DataIdentifier(id)); | |
assertNull(rec); | |
} | |
/** | |
* {@link CompositeDataStoreCache#getIfPresent(String)} when no cache. | |
*/ | |
@Test | |
public void getRecordNotAvailable() throws DataStoreException { | |
DataRecord rec = dataStore.getRecordIfStored(new DataIdentifier(ID_PREFIX + 0)); | |
assertNull(rec); | |
} | |
/** | |
* {@link CompositeDataStoreCache#get(String)} when no cache. | |
* @throws IOException | |
*/ | |
@Test | |
public void exists() throws IOException { | |
assertFalse(dataStore.exists(new DataIdentifier(ID_PREFIX + 0))); | |
} | |
/** | |
* Add in datastore. | |
*/ | |
@Test | |
public void addDelete() throws Exception { | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
String id = getIdForInputStream(f); | |
FileInputStream fin = new FileInputStream(f); | |
closer.register(fin); | |
DataRecord rec = dataStore.addRecord(fin); | |
assertEquals(id, rec.getIdentifier().toString()); | |
//start & finish | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(); | |
rec = dataStore.getRecordIfStored(new DataIdentifier(id)); | |
assertNotNull(rec); | |
assertFile(rec.getStream(), f, folder); | |
dataStore.deleteRecord(new DataIdentifier(id)); | |
rec = dataStore.getRecordIfStored(new DataIdentifier(id)); | |
assertNull(rec); | |
} | |
/** | |
* Add in staging and delete. | |
* @throws Exception | |
*/ | |
@Test | |
public void addStagingAndDelete() throws Exception { | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
String id = getIdForInputStream(f); | |
FileInputStream fin = new FileInputStream(f); | |
closer.register(fin); | |
DataRecord rec = dataStore.addRecord(fin); | |
assertEquals(id, rec.getIdentifier().toString()); | |
assertFile(rec.getStream(), f, folder); | |
rec = dataStore.getRecordIfStored(new DataIdentifier(id)); | |
assertNotNull(rec); | |
assertFile(rec.getStream(), f, folder); | |
dataStore.deleteRecord(new DataIdentifier(id)); | |
rec = dataStore.getRecordIfStored(new DataIdentifier(id)); | |
assertNull(rec); | |
Thread.sleep(1000); | |
//start & finish | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(); | |
rec = dataStore.getRecordIfStored(new DataIdentifier(id)); | |
assertNull(rec); | |
} | |
/** | |
* Get all Identifiers. | |
*/ | |
@Test | |
public void getAllIdentifiers() throws Exception { | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
String id = getIdForInputStream(f); | |
FileInputStream fin = new FileInputStream(f); | |
closer.register(fin); | |
DataRecord rec = dataStore.addRecord(fin); | |
assertEquals(id, rec.getIdentifier().toString()); | |
assertTrue(Iterators.contains(dataStore.getAllIdentifiers(), new DataIdentifier(id))); | |
//start & finish | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(); | |
assertTrue(Iterators.contains(dataStore.getAllIdentifiers(), new DataIdentifier(id))); | |
} | |
@After | |
public void tear() throws Exception { | |
closer.close(); | |
dataStore.close(); | |
} | |
private static void assertFile(InputStream is, File org, TemporaryFolder folder) | |
throws IOException { | |
try { | |
File ret = folder.newFile(); | |
FileUtils.copyInputStreamToFile(is, ret); | |
assertTrue(Files.equal(org, ret)); | |
} finally { | |
IOUtils.closeQuietly(is); | |
} | |
} | |
private String getIdForInputStream(File f) | |
throws Exception { | |
FileInputStream in = null; | |
OutputStream output = null; | |
try { | |
in = new FileInputStream(f); | |
MessageDigest digest = MessageDigest.getInstance("SHA-1"); | |
output = new DigestOutputStream(new NullOutputStream(), digest); | |
IOUtils.copyLarge(in, output); | |
return encodeHexString(digest.digest()); | |
} finally { | |
IOUtils.closeQuietly(output); | |
IOUtils.closeQuietly(in); | |
} | |
} | |
private void waitFinish() { | |
try { | |
// wait for upload finish | |
afterExecuteLatch.await(); | |
// Force execute removal from staging cache | |
ScheduledFuture<?> scheduledFuture = scheduledExecutor | |
.schedule(dataStore.getCache().getStagingCache().new RemoveJob(), 0, TimeUnit.MILLISECONDS); | |
scheduledFuture.get(); | |
LOG.info("After jobs completed"); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.jackrabbit.oak.plugins.blob; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ScheduledFuture; | |
import java.util.concurrent.TimeUnit; | |
import com.google.common.io.Closer; | |
import com.google.common.io.Files; | |
import com.google.common.util.concurrent.ListeningExecutorService; | |
import com.google.common.util.concurrent.MoreExecutors; | |
import com.google.common.util.concurrent.SettableFuture; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; | |
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider; | |
import org.apache.jackrabbit.oak.stats.StatisticsProvider; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.junit.rules.ExpectedException; | |
import org.junit.rules.TemporaryFolder; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.assertFalse; | |
import static org.junit.Assert.assertNotNull; | |
import static org.junit.Assert.assertNull; | |
import static org.junit.Assert.assertTrue; | |
/** | |
* Tests for {@link CompositeDataStoreCache}. | |
*/ | |
public class CompositeDataStoreCacheTest extends AbstractDataStoreCacheTest { | |
private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCacheTest.class); | |
private static final String ID_PREFIX = "12345"; | |
@Rule | |
public TemporaryFolder folder = new TemporaryFolder(new File("target")); | |
@Rule | |
public ExpectedException expectedEx = ExpectedException.none(); | |
private CompositeDataStoreCache cache; | |
private final Closer closer = Closer.create(); | |
private File root; | |
private TestStagingUploader uploader; | |
private TestCacheLoader loader; | |
private CountDownLatch taskLatch; | |
private CountDownLatch callbackLatch; | |
private CountDownLatch afterExecuteLatch; | |
private TestExecutor executor; | |
private StatisticsProvider statsProvider; | |
private ScheduledExecutorService scheduledExecutor; | |
@Before | |
public void setup() throws IOException { | |
root = folder.newFolder(); | |
loader = new TestCacheLoader<String, InputStream>(root); | |
uploader = new TestStagingUploader(root); | |
// create executor | |
taskLatch = new CountDownLatch(1); | |
callbackLatch = new CountDownLatch(1); | |
afterExecuteLatch = new CountDownLatch(1); | |
executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch); | |
// stats | |
ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor(); | |
closer.register(new ExecutorCloser(statsExecutor, 500, TimeUnit.MILLISECONDS)); | |
statsProvider = new DefaultStatisticsProvider(statsExecutor); | |
scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); | |
closer.register(new ExecutorCloser(scheduledExecutor, 500, TimeUnit.MILLISECONDS)); | |
//cache instance | |
cache = new CompositeDataStoreCache(root.getAbsolutePath(), | |
80 * 1024 /* bytes */, 10, 1/*threads*/, loader, | |
uploader, statsProvider, executor, scheduledExecutor, 3000); | |
closer.register(cache); | |
} | |
@After | |
public void tear() throws IOException { | |
closer.close(); | |
} | |
@Test | |
public void zeroCache() throws IOException { | |
cache = new CompositeDataStoreCache(root.getAbsolutePath(), | |
0 /* bytes */, 10, 1/*threads*/, loader, | |
uploader, statsProvider, executor, scheduledExecutor, 3000); | |
closer.register(cache); | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
boolean accepted = cache.stage(ID_PREFIX + 0, f); | |
assertFalse(accepted); | |
assertNull(cache.getIfPresent(ID_PREFIX + 0)); | |
assertNull(cache.get(ID_PREFIX + 0)); | |
assertEquals(0, cache.getStagingCache().getStats().getMaxTotalWeight()); | |
assertEquals(0, cache.getStagingCacheStats().getMaxTotalWeight()); | |
assertEquals(0,cache.getDownloadCache().getStats().getMaxTotalWeight()); | |
assertEquals(0,cache.getCacheStats().getMaxTotalWeight()); | |
cache.invalidate(ID_PREFIX + 0); | |
cache.close(); | |
} | |
/** | |
* {@link CompositeDataStoreCache#getIfPresent(String)} when no cache. | |
*/ | |
@Test | |
public void getIfPresentNoCache() { | |
File file = cache.getIfPresent(ID_PREFIX + 0); | |
assertNull(file); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 0, 0); | |
} | |
/** | |
* {@link CompositeDataStoreCache#get(String)} when no cache. | |
* @throws IOException | |
*/ | |
@Test | |
public void getNoCache() throws IOException { | |
expectedEx.expect(IOException.class); | |
cache.get(ID_PREFIX + 0); | |
} | |
/** | |
* {@link CompositeDataStoreCache#getIfPresent(Object)} when no cache. | |
*/ | |
@Test | |
public void getIfPresentObjectNoCache() { | |
File file = cache.getIfPresent((Object) (ID_PREFIX + 0)); | |
assertNull(file); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 0, 0); | |
assertCacheStats(cache.getDownloadCache().getStats(), 0, 0, 0, 0); | |
} | |
/** | |
* Add to staging | |
*/ | |
@Test | |
public void add() throws Exception { | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
boolean accepted = cache.stage(ID_PREFIX + 0, f); | |
assertTrue(accepted); | |
//start | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(); | |
File file = cache.getIfPresent(ID_PREFIX + 0); | |
assertNotNull(f); | |
assertFile(file, 0, folder); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 1, 1); | |
} | |
/** | |
* Add to staging when cache full. | |
*/ | |
@Test | |
public void addCacheFull() throws IOException { | |
cache = new CompositeDataStoreCache(root.getAbsolutePath(), | |
40 * 1024 /* bytes */, 10 /* staging % */, | |
1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000); | |
closer.register(cache); | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
boolean accepted = cache.stage(ID_PREFIX + 0, f); | |
assertTrue(accepted); | |
File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); | |
accepted = cache.stage(ID_PREFIX + 1, f2); | |
assertFalse(accepted); | |
//start the original upload | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(); | |
File file = cache.getIfPresent(ID_PREFIX + 0); | |
assertNotNull(f); | |
assertFile(file, 0, folder); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 1, 2); | |
} | |
/** | |
* Invalidate from staging. | |
*/ | |
@Test | |
public void invalidateStaging() throws IOException { | |
// create executor | |
taskLatch = new CountDownLatch(2); | |
callbackLatch = new CountDownLatch(2); | |
afterExecuteLatch = new CountDownLatch(2); | |
executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch); | |
cache = new CompositeDataStoreCache(root.getAbsolutePath(), | |
80 * 1024 /* bytes */, 10 /* staging % */, | |
1/*threads*/, loader, uploader, statsProvider, executor, scheduledExecutor, 3000); | |
closer.register(cache); | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
boolean accepted = cache.stage(ID_PREFIX + 0, f); | |
assertTrue(accepted); | |
File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); | |
accepted = cache.stage(ID_PREFIX + 1, f2); | |
assertTrue(accepted); | |
cache.invalidate(ID_PREFIX + 0); | |
//start the original uploads | |
taskLatch.countDown(); | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(); | |
File file = cache.getIfPresent(ID_PREFIX + 0); | |
assertNull(file); | |
file = cache.getIfPresent(ID_PREFIX + 1); | |
assertFile(file, 1, folder); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 2, 2); | |
} | |
/** | |
* Test {@link CompositeDataStoreCache#getIfPresent(String)} when file staged | |
* and then download cache when uploaded. | |
* @throws IOException | |
*/ | |
@Test | |
public void getIfPresentStaged() throws IOException { | |
get(false); | |
} | |
/** | |
* Test {@link CompositeDataStoreCache#get(String)} when file staged and then | |
* download cache when uploaded. | |
* @throws IOException | |
*/ | |
@Test | |
public void getStaged() throws IOException { | |
get(true); | |
} | |
private void get(boolean get) throws IOException { | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
boolean accepted = cache.stage(ID_PREFIX + 0, f); | |
assertTrue(accepted); | |
// hit the staging cache as not uploaded | |
File file; | |
if (get) { | |
file = cache.get(ID_PREFIX + 0); | |
} else { | |
file = cache.getIfPresent(ID_PREFIX + 0); | |
} | |
assertNotNull(file); | |
assertFile(file, 0, folder); | |
assertCacheStats(cache.getStagingCacheStats(), 1, 4 * 1024, 1, 1); | |
//start the original upload | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(); | |
// Not should hit the download cache | |
if (get) { | |
file = cache.get(ID_PREFIX + 0); | |
} else { | |
file = cache.getIfPresent(ID_PREFIX + 0); | |
} | |
assertNotNull(f); | |
assertFile(file, 0, folder); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 1, 1); | |
assertCacheStats(cache.getCacheStats(), 1, 4 * 1024, 1, 1); | |
} | |
/** | |
* Load and get from the download cache. | |
* @throws Exception | |
*/ | |
@Test | |
public void getLoad() throws Exception { | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
loader.write(ID_PREFIX + 0, f); | |
// Not present yet | |
File cached = cache.getIfPresent(ID_PREFIX + 0); | |
assertNull(cached); | |
// present after loading | |
cached = cache.get(ID_PREFIX + 0); | |
assertNotNull(cached); | |
assertTrue(Files.equal(f, cached)); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 0, 0); | |
assertEquals(2, cache.getStagingCacheStats().getLoadCount()); | |
assertEquals(0, cache.getStagingCacheStats().getLoadSuccessCount()); | |
assertCacheStats(cache.getCacheStats(), 1, 4 * 1024, 0, 2); | |
assertEquals(1, cache.getCacheStats().getLoadCount()); | |
assertEquals(1, cache.getCacheStats().getLoadSuccessCount()); | |
} | |
/** | |
* Invalidate cache entry. | |
* @throws Exception | |
*/ | |
@Test | |
public void invalidate() throws Exception { | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
loader.write(ID_PREFIX + 0, f); | |
// present after loading | |
File cached = cache.get(ID_PREFIX + 0); | |
assertNotNull(cached); | |
assertTrue(Files.equal(f, cached)); | |
cache.invalidate(ID_PREFIX + 0); | |
// Not present now | |
cached = cache.getIfPresent(ID_PREFIX + 0); | |
assertNull(cached); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 0, 0); | |
assertEquals(2, cache.getStagingCacheStats().getLoadCount()); | |
assertEquals(0, cache.getStagingCacheStats().getLoadSuccessCount()); | |
assertCacheStats(cache.getCacheStats(), 0, 0, 0, 2); | |
assertEquals(1, cache.getCacheStats().getLoadCount()); | |
assertEquals(1, cache.getCacheStats().getLoadSuccessCount()); | |
/** Check eviction count */ | |
assertEquals(0, cache.getCacheStats().getEvictionCount()); | |
} | |
/** | |
* Concurrently retrieves 2 different files from cache. | |
* @throws Exception | |
*/ | |
@Test | |
public void concurrentGetCached() throws Exception { | |
// Add 2 files to backend | |
// Concurrently get both | |
ListeningExecutorService executorService = | |
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); | |
closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS)); | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
loader.write(ID_PREFIX + 0, f); | |
File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); | |
loader.write(ID_PREFIX + 1, f2); | |
CountDownLatch thread1Start = new CountDownLatch(1); | |
SettableFuture<File> future1 = | |
retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start); | |
CountDownLatch thread2Start = new CountDownLatch(1); | |
SettableFuture<File> future2 = | |
retrieveThread(executorService, ID_PREFIX + 1, cache, thread2Start); | |
thread1Start.countDown(); | |
thread2Start.countDown(); | |
File cached = future1.get(); | |
File cached2 = future2.get(); | |
LOG.info("Async tasks finished"); | |
assertTrue(Files.equal(f, cached)); | |
assertTrue(Files.equal(f2, cached2)); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 0, 0); | |
assertEquals(2, cache.getStagingCacheStats().getLoadCount()); | |
assertEquals(0, cache.getStagingCacheStats().getLoadSuccessCount()); | |
assertCacheStats(cache.getCacheStats(), 2, 8 * 1024, 0, 4); | |
assertEquals(2, cache.getCacheStats().getLoadCount()); | |
assertEquals(2, cache.getCacheStats().getLoadSuccessCount()); | |
} | |
/** | |
* Concurrently retrieves 2 different files from cache. | |
* One is staged and other in the download cache. | |
* @throws Exception | |
*/ | |
@Test | |
public void concurrentGetFromStagedAndCached() throws Exception { | |
// Add 1 to backend | |
// Add 2 to upload area | |
// Stop upload execution | |
// Concurrently get 1 & 2 | |
// continue upload execution | |
ListeningExecutorService executorService = | |
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); | |
closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS)); | |
// Add file to backend | |
File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); | |
loader.write(ID_PREFIX + 1, f2); | |
// stage for upload | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
boolean accepted = cache.stage(ID_PREFIX + 0, f); | |
assertTrue(accepted); | |
// Would hit the staging cache | |
CountDownLatch thread1Start = new CountDownLatch(1); | |
SettableFuture<File> future1 = | |
retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start); | |
// Would hit the download cache and load | |
CountDownLatch thread2Start = new CountDownLatch(1); | |
SettableFuture<File> future2 = | |
retrieveThread(executorService, ID_PREFIX + 1, cache, thread2Start); | |
thread1Start.countDown(); | |
thread2Start.countDown(); | |
File cached = future1.get(); | |
File cached2 = future2.get(); | |
LOG.info("Async tasks finished"); | |
assertFile(cached, 0, folder); | |
assertTrue(Files.equal(f2, cached2)); | |
//start the original upload | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 1, 1); | |
assertEquals(2, cache.getStagingCacheStats().getLoadCount()); | |
assertEquals(1, cache.getStagingCacheStats().getLoadSuccessCount()); | |
assertCacheStats(cache.getCacheStats(), 2, 8 * 1024, 0, 2); | |
assertEquals(1, cache.getCacheStats().getLoadCount()); | |
assertEquals(1, cache.getCacheStats().getLoadSuccessCount()); | |
} | |
/** | |
* Concurrently stage and get a file and then upload. | |
* Use the file retrieve to read contents. | |
* @throws Exception | |
*/ | |
@Test | |
public void concurrentAddGet() throws Exception { | |
// Add to the upload area | |
// stop upload execution | |
// Same as above but concurrently | |
// Get | |
// Continue upload execution | |
ListeningExecutorService executorService = | |
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); | |
closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS)); | |
// stage for upload | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
boolean accepted = cache.stage(ID_PREFIX + 0, f); | |
assertTrue(accepted); | |
// Would hit the staging cache | |
CountDownLatch thread1Start = new CountDownLatch(1); | |
SettableFuture<File> future1 = | |
retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start); | |
// Get a handle to the file and open stream | |
File fileOnUpload = cache.getIfPresent(ID_PREFIX + 0); | |
assertNotNull(fileOnUpload); | |
final FileInputStream fStream = Files.newInputStreamSupplier(fileOnUpload).getInput(); | |
thread1Start.countDown(); | |
//start the original upload | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
future1.get(); | |
waitFinish(); | |
LOG.info("Async tasks finished"); | |
// File was returned from async cache but now deleted | |
// assertFalse(fileOnUpload.exists()); | |
File gold = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
File fromUploadStream = copyToFile(fStream, folder.newFile()); | |
assertTrue(Files.equal(gold, fromUploadStream)); | |
assertCacheStats(cache.getStagingCacheStats(), 0, 0, 1, 1); | |
assertEquals(2, cache.getStagingCacheStats().getLoadCount()); | |
assertEquals(0, cache.getCacheStats().getLoadCount()); | |
assertEquals(0, cache.getCacheStats().getLoadSuccessCount()); | |
} | |
/**--------------------------- Helper Methods -----------------------------------------------**/ | |
private static SettableFuture<File> retrieveThread(ListeningExecutorService executor, | |
final String id, final CompositeDataStoreCache cache, final CountDownLatch start) { | |
final SettableFuture<File> future = SettableFuture.create(); | |
executor.submit(new Runnable() { | |
@Override public void run() { | |
try { | |
LOG.info("Waiting for start retrieve"); | |
start.await(); | |
LOG.info("Starting retrieve [{}]", id); | |
File cached = cache.get(id); | |
LOG.info("Finished retrieve"); | |
future.set(cached); | |
} catch (Exception e) { | |
LOG.info("Exception in get", e); | |
future.setException(e); | |
} | |
} | |
}); | |
return future; | |
} | |
private void waitFinish() { | |
try { | |
// wait for upload finish | |
afterExecuteLatch.await(); | |
// Force execute removal from staging cache | |
ScheduledFuture<?> scheduledFuture = scheduledExecutor | |
.schedule(cache.getStagingCache().new RemoveJob(), 0, TimeUnit.MILLISECONDS); | |
scheduledFuture.get(); | |
LOG.info("After jobs completed"); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
private static void assertCacheStats(DataStoreCacheStatsMBean cache, long elems, long weight, | |
long hits, long count) { | |
assertEquals("elements don't match", elems, cache.getElementCount()); | |
assertEquals("weight doesn't match", weight, cache.estimateCurrentWeight()); | |
assertEquals("hits count don't match", hits, cache.getHitCount()); | |
assertEquals("requests count don't match", count, cache.getRequestCount()); | |
} | |
private void assertFile(File f, int seed, TemporaryFolder folder) throws IOException { | |
assertTrue(f.exists()); | |
File temp = copyToFile(randomStream(seed, 4 * 1024), folder.newFile()); | |
assertTrue("Uploaded file content differs", FileUtils.contentEquals(temp, f)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.jackrabbit.oak.plugins.blob; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import com.google.common.io.Closer; | |
import com.google.common.io.Files; | |
import com.google.common.util.concurrent.Futures; | |
import com.google.common.util.concurrent.ListenableFuture; | |
import com.google.common.util.concurrent.ListeningExecutorService; | |
import com.google.common.util.concurrent.MoreExecutors; | |
import com.google.common.util.concurrent.SettableFuture; | |
import org.apache.jackrabbit.oak.commons.StringUtils; | |
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.junit.rules.TemporaryFolder; | |
import org.junit.rules.TestName; | |
import static org.apache.commons.io.IOUtils.closeQuietly; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.assertFalse; | |
import static org.junit.Assert.assertNotNull; | |
import static org.junit.Assert.assertNull; | |
import static org.junit.Assert.assertTrue; | |
/** | |
* - Tests for {@link FileCache} | |
*/ | |
public class FileCacheTest extends AbstractDataStoreCacheTest { | |
private static final String ID_PREFIX = "12345"; | |
private FileCache cache; | |
private File root; | |
private TestCacheLoader loader; | |
private Closer closer; | |
@Rule | |
public TemporaryFolder folder = new TemporaryFolder(new File("target")); | |
@Rule | |
public TestName testName = new TestName(); | |
@Before | |
public void setup() throws Exception { | |
root = folder.newFolder(); | |
closer = Closer.create(); | |
loader = new TestCacheLoader<String, InputStream>(root); | |
if (!testName.getMethodName().equals("rebuild")) { | |
CountDownLatch beforeLatch = new CountDownLatch(1); | |
CountDownLatch afterLatch = new CountDownLatch(1); | |
CountDownLatch afterExecuteLatch = new CountDownLatch(1); | |
TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch); | |
beforeLatch.countDown(); | |
afterLatch.countDown(); | |
cache = FileCache.build(4 * 1024/* KB */, root, loader, executor); | |
Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get(); | |
closer.register(cache); | |
} | |
} | |
@After | |
public void tear() { | |
closeQuietly(closer); | |
} | |
@Test | |
public void zeroCache() throws Exception { | |
cache = FileCache.build(0/* KB */, root, loader, null); | |
closer.register(cache); | |
File f = createFile(0, loader, cache, folder); | |
cache.put(ID_PREFIX + 0, f); | |
assertNull(cache.getIfPresent(ID_PREFIX + 0)); | |
assertNull(cache.get(ID_PREFIX + 0)); | |
assertEquals(0, cache.getStats().getMaxTotalWeight()); | |
cache.invalidate(ID_PREFIX + 0); | |
assertFalse(cache.containsKey(ID_PREFIX + 0)); | |
cache.close(); | |
} | |
/** | |
* Load and get from cache. | |
* @throws Exception | |
*/ | |
@Test | |
public void add() throws Exception { | |
LOG.info("Started add"); | |
File f = createFile(0, loader, cache, folder); | |
assertCache(0, cache, f); | |
assertCacheStats(cache, 1, 4 * 1024, 1, 1); | |
assertEquals("Memory weight different", | |
getWeight(ID_PREFIX + 0, cache.getIfPresent(ID_PREFIX + 0)), | |
cache.getStats().estimateCurrentMemoryWeight()); | |
LOG.info("Finished add"); | |
} | |
/** | |
* Explicitly put in cache. | |
* @throws Exception | |
*/ | |
@Test | |
public void put() throws Exception { | |
LOG.info("Started put"); | |
//File f = FileIOUtils.copy(randomStream(0, 4 * 1024)); | |
cache.put(ID_PREFIX + 0, copyToFile(randomStream(0, 4 * 1024), folder.newFile())); | |
assertCacheIfPresent(0, cache, copyToFile(randomStream(0, 4 * 1024), folder.newFile())); | |
assertCacheStats(cache, 1, 4 * 1024, 0, 0); | |
LOG.info("Finished put"); | |
} | |
/** | |
* Tests {@link FileCache#getIfPresent(Object)} when no cache. | |
*/ | |
@Test | |
public void getIfPresentObjectNoCache() { | |
LOG.info("Started getIfPresentObjectNoCache"); | |
File file = cache.getIfPresent((Object) (ID_PREFIX + 0)); | |
assertNull(file); | |
assertCacheStats(cache, 0, 0, 0, 0); | |
LOG.info("Finished getIfPresentObjectNoCache"); | |
} | |
/** | |
* Retrieves same file concurrently. | |
* @throws Exception | |
*/ | |
@Test | |
public void retrieveSameConcurrent() throws Exception { | |
LOG.info("Started retrieveSameConcurrent"); | |
File f = createFile(0, loader, cache, folder); | |
ListeningExecutorService executorService = | |
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); | |
closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS)); | |
CountDownLatch thread1Start = new CountDownLatch(1); | |
SettableFuture<File> future1 = | |
retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start); | |
CountDownLatch thread2Start = new CountDownLatch(1); | |
SettableFuture<File> future2 = | |
retrieveThread(executorService, ID_PREFIX + 0, cache, thread2Start); | |
thread1Start.countDown(); | |
thread2Start.countDown(); | |
future1.get(); | |
future2.get(); | |
LOG.info("Async tasks finished"); | |
assertCacheIfPresent(0, cache, f); | |
assertCacheStats(cache, 1, 4 * 1024, 1, 1); | |
LOG.info("Finished retrieveSameConcurrent"); | |
} | |
/** | |
* Retrieves different files concurrently. | |
* @throws Exception | |
*/ | |
@Test | |
public void getDifferentConcurrent() throws Exception { | |
LOG.info("Started getDifferentConcurrent"); | |
File f = createFile(0, loader, cache, folder); | |
File f2 = createFile(1, loader, cache, folder); | |
ListeningExecutorService executorService = | |
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); | |
closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS)); | |
CountDownLatch thread1Start = new CountDownLatch(1); | |
SettableFuture<File> future1 = | |
retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start); | |
CountDownLatch thread2Start = new CountDownLatch(1); | |
SettableFuture<File> future2 = | |
retrieveThread(executorService, ID_PREFIX + 1, cache, thread2Start); | |
thread1Start.countDown(); | |
thread2Start.countDown(); | |
future1.get(); | |
future2.get(); | |
LOG.info("Async tasks finished"); | |
assertCacheIfPresent(0, cache, f); | |
assertCacheIfPresent(1, cache, f2); | |
assertCacheStats(cache, 2, 8 * 1024, 2, 2); | |
LOG.info("Finished getDifferentConcurrent"); | |
} | |
/** | |
* Retrieve and put different files concurrently. | |
* @throws Exception | |
*/ | |
@Test | |
public void retrievePutConcurrent() throws Exception { | |
LOG.info("Started retrievePutConcurrent"); | |
//Create load | |
final File f = createFile(0, loader, cache, folder); | |
File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); | |
ListeningExecutorService executorService = | |
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); | |
closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS)); | |
CountDownLatch thread1Start = new CountDownLatch(1); | |
SettableFuture<File> future1 = | |
retrieveThread(executorService, ID_PREFIX + 0, cache, thread1Start); | |
CountDownLatch thread2Start = new CountDownLatch(1); | |
SettableFuture<Boolean> future2 = putThread(executorService, 1, f2, cache, thread2Start); | |
thread1Start.countDown(); | |
thread2Start.countDown(); | |
future1.get(); | |
future2.get(); | |
LOG.info("Async tasks finished"); | |
assertCacheIfPresent(0, cache, f); | |
assertCacheIfPresent(1, cache, copyToFile(randomStream(1, 4 * 1024), folder.newFile())); | |
assertCacheStats(cache, 2, 8 * 1024, 1, 1); | |
LOG.info("Finished retrievePutConcurrent"); | |
} | |
/** | |
* evict explicitly. | |
* @throws Exception | |
*/ | |
@Test | |
public void evictExplicit() throws Exception { | |
LOG.info("Started evictExplicit"); | |
File f = createFile(0, loader, cache, folder); | |
assertCache(0, cache, f); | |
// trigger explicit invalidate | |
cache.invalidate(ID_PREFIX + 0); | |
assertFalse(cache.containsKey(ID_PREFIX + 0)); | |
assertCacheStats(cache, 0, 0, 1, 1); | |
LOG.info("Finished evictExplicit"); | |
} | |
/** | |
* evict implicitly. | |
* @throws Exception | |
*/ | |
@Test | |
public void evictImplicit() throws Exception { | |
LOG.info("Started evictImplicit"); | |
for (int i = 0; i < 15; i++) { | |
File f = createFile(i, loader, cache, folder); | |
assertCache(i, cache, f); | |
} | |
File f = createFile(30, loader, cache, folder); | |
assertCache(30, cache, f); | |
// One of the entries should have been evicted | |
assertTrue(cache.getStats().getElementCount() == 15); | |
assertCacheStats(cache, 15, 60 * 1024, 16, 16); | |
LOG.info("Finished evictImplicit"); | |
} | |
/** | |
* Retrieve and invalidate concurrently. | |
* @throws Exception | |
*/ | |
@Test | |
public void getInvalidateConcurrent() throws Exception { | |
LOG.info("Started getInvalidateConcurrent"); | |
//Create load | |
for (int i = 0; i < 15; i++) { | |
if (i != 4) { | |
File f = createFile(i, loader, cache, folder); | |
assertCache(i, cache, f); | |
} | |
} | |
LOG.info("Finished creating load"); | |
ListeningExecutorService executorService = | |
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); | |
closer.register(new ExecutorCloser(executorService, 5, TimeUnit.MILLISECONDS)); | |
CountDownLatch thread1Start = new CountDownLatch(1); | |
SettableFuture<File> future1 = | |
retrieveThread(executorService, ID_PREFIX + 10, cache, thread1Start); | |
thread1Start.countDown(); | |
File f = createFile(4, loader, cache, folder); | |
CountDownLatch thread2Start = new CountDownLatch(1); | |
SettableFuture<File> future2 = | |
retrieveThread(executorService, ID_PREFIX + 4, cache, thread2Start); | |
thread2Start.countDown(); | |
File f10 = future1.get(); | |
future2.get(); | |
LOG.info("Async tasks finished"); | |
if (f10.exists()) { | |
assertCacheIfPresent(10, cache, f10); | |
} | |
if (f.exists()) { | |
assertCacheIfPresent(4, cache, f); | |
} | |
LOG.info("Finished getInvalidateConcurrent"); | |
} | |
/** | |
* Trigger build cache on start. | |
* @throws Exception | |
*/ | |
@Test | |
public void rebuild() throws Exception { | |
LOG.info("Started rebuild"); | |
root = folder.newFolder(); | |
CountDownLatch beforeLatch = new CountDownLatch(1); | |
CountDownLatch afterLatch = new CountDownLatch(1); | |
CountDownLatch afterExecuteLatch = new CountDownLatch(1); | |
TestExecutor executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch); | |
beforeLatch.countDown(); | |
afterLatch.countDown(); | |
cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor); | |
afterExecuteLatch.await(); | |
Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get(); | |
LOG.info("Cache built"); | |
File f = createFile(0, loader, cache, folder); | |
assertCache(0, cache, f); | |
cache.close(); | |
beforeLatch = new CountDownLatch(1); | |
afterLatch = new CountDownLatch(1); | |
afterExecuteLatch = new CountDownLatch(1); | |
executor = new TestExecutor(1, beforeLatch, afterLatch, afterExecuteLatch); | |
beforeLatch.countDown(); | |
afterLatch.countDown(); | |
cache = FileCache.build(4 * 1024/* bytes */, root, loader, executor); | |
closer.register(cache); | |
afterExecuteLatch.await(); | |
Futures.successfulAsList((Iterable<? extends ListenableFuture<?>>) executor.futures).get(); | |
LOG.info("Cache rebuilt"); | |
assertCacheIfPresent(0, cache, f); | |
assertCacheStats(cache, 1, 4 * 1024, 0, 0); | |
LOG.info("Finished rebuild"); | |
} | |
/**------------------------------ Helper methods --------------------------------------------**/ | |
private static SettableFuture<File> retrieveThread(ListeningExecutorService executor, | |
final String id, final FileCache cache, final CountDownLatch start) { | |
final SettableFuture<File> future = SettableFuture.create(); | |
executor.submit(new Runnable() { | |
@Override public void run() { | |
try { | |
LOG.info("Waiting for start retrieve"); | |
start.await(); | |
LOG.info("Starting retrieve [{}]", id); | |
File cached = cache.get(id); | |
LOG.info("Finished retrieve"); | |
future.set(cached); | |
} catch (Exception e) { | |
LOG.info("Exception in get", e); | |
} | |
} | |
}); | |
return future; | |
} | |
private static SettableFuture<Boolean> putThread(ListeningExecutorService executor, | |
final int seed, final File f, final FileCache cache, final CountDownLatch start) { | |
final SettableFuture<Boolean> future = SettableFuture.create(); | |
executor.submit(new Runnable() { | |
@Override public void run() { | |
try { | |
LOG.info("Waiting for start to put"); | |
start.await(); | |
LOG.info("Starting put"); | |
cache.put(ID_PREFIX + seed, f); | |
LOG.info("Finished put"); | |
future.set(true); | |
} catch (Exception e) { | |
LOG.info("Exception in get", e); | |
} | |
} | |
}); | |
return future; | |
} | |
private static int getWeight(String key, File value) { | |
return StringUtils.estimateMemoryUsage(key) + | |
StringUtils.estimateMemoryUsage(value.getAbsolutePath()) + 48; | |
} | |
private static void assertCacheIfPresent(int seed, FileCache cache, File f) throws IOException { | |
File cached = cache.getIfPresent(ID_PREFIX + seed); | |
assertNotNull(cached); | |
assertTrue(Files.equal(f, cached)); | |
} | |
private static void assertCache(int seed, FileCache cache, File f) throws IOException { | |
File cached = cache.get(ID_PREFIX + seed); | |
assertNotNull(cached); | |
assertTrue(Files.equal(f, cached)); | |
} | |
private static File createFile(int seed, TestCacheLoader loader, FileCache cache, | |
TemporaryFolder folder) throws Exception { | |
File f = copyToFile(randomStream(0, 4 * 1024), | |
folder.newFile()); | |
loader.write(ID_PREFIX + seed, f); | |
assertNull(cache.getIfPresent(ID_PREFIX + seed)); | |
return f; | |
} | |
private static void assertCacheStats(FileCache cache, long elems, long weight, long loads, | |
long loadSuccesses) { | |
assertEquals(elems, cache.getStats().getElementCount()); | |
assertEquals(weight, cache.getStats().estimateCurrentWeight()); | |
assertEquals(loads, cache.getStats().getLoadCount()); | |
assertEquals(loadSuccesses, cache.getStats().getLoadSuccessCount()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
*/ | |
package org.apache.jackrabbit.oak.plugins.blob; | |
import java.io.File; | |
import java.io.FileInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ScheduledFuture; | |
import java.util.concurrent.TimeUnit; | |
import com.google.common.base.Optional; | |
import com.google.common.collect.Iterators; | |
import com.google.common.collect.Lists; | |
import com.google.common.io.Closer; | |
import com.google.common.io.Files; | |
import com.google.common.util.concurrent.Futures; | |
import com.google.common.util.concurrent.ListenableFuture; | |
import com.google.common.util.concurrent.ListeningExecutorService; | |
import com.google.common.util.concurrent.MoreExecutors; | |
import com.google.common.util.concurrent.SettableFuture; | |
import org.apache.commons.io.FileUtils; | |
import org.apache.jackrabbit.core.data.DataStoreException; | |
import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser; | |
import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider; | |
import org.apache.jackrabbit.oak.stats.StatisticsProvider; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.junit.rules.TemporaryFolder; | |
import org.mockito.Matchers; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.assertFalse; | |
import static org.junit.Assert.assertNull; | |
import static org.junit.Assert.assertTrue; | |
import static org.mockito.Mockito.doThrow; | |
import static org.mockito.Mockito.mock; | |
/** | |
* Tests for {@link UploadStagingCache}. | |
*/ | |
public class UploadStagingCacheTest extends AbstractDataStoreCacheTest { | |
private static final Logger LOG = LoggerFactory.getLogger(UploadStagingCacheTest.class); | |
private static final String ID_PREFIX = "12345"; | |
@Rule | |
public TemporaryFolder folder = new TemporaryFolder(new File("target")); | |
private final Closer closer = Closer.create(); | |
private TestStagingUploader uploader; | |
private File root; | |
private CountDownLatch taskLatch; | |
private CountDownLatch callbackLatch; | |
private CountDownLatch afterExecuteLatch; | |
private TestExecutor executor; | |
private UploadStagingCache stagingCache; | |
private StatisticsProvider statsProvider; | |
private ScheduledExecutorService removeExecutor; | |
@Before | |
public void setup() throws IOException { | |
root = folder.newFolder(); | |
init(); | |
} | |
private void init() { | |
// uploader | |
uploader = new TestStagingUploader(root); | |
// create executor | |
taskLatch = new CountDownLatch(1); | |
callbackLatch = new CountDownLatch(1); | |
afterExecuteLatch = new CountDownLatch(1); | |
executor = new TestExecutor(1, taskLatch, callbackLatch, afterExecuteLatch); | |
// stats | |
ScheduledExecutorService statsExecutor = Executors.newSingleThreadScheduledExecutor(); | |
closer.register(new ExecutorCloser(statsExecutor, 500, TimeUnit.MILLISECONDS)); | |
statsProvider = new DefaultStatisticsProvider(statsExecutor); | |
removeExecutor = Executors.newSingleThreadScheduledExecutor(); | |
closer.register(new ExecutorCloser(removeExecutor, 500, TimeUnit.MILLISECONDS)); | |
//cache instance | |
stagingCache = | |
UploadStagingCache.build(root, 1/*threads*/, 8 * 1024 /* bytes */, | |
uploader, null/*cache*/, statsProvider, executor, null, 3000); | |
closer.register(stagingCache); | |
} | |
@After | |
public void tear() throws IOException { | |
closer.close(); | |
} | |
@Test | |
public void testZeroCache() throws IOException { | |
stagingCache = | |
UploadStagingCache.build(root, 1/*threads*/, 0 /* bytes */, | |
uploader, null/*cache*/, statsProvider, executor, null, 3000); | |
closer.register(stagingCache); | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f); | |
assertFalse(future.isPresent()); | |
assertNull(stagingCache.getIfPresent(ID_PREFIX + 0)); | |
assertEquals(0, Iterators.size(stagingCache.getAllIdentifiers())); | |
assertEquals(0, stagingCache.getStats().getMaxTotalWeight()); | |
} | |
/** | |
* Stage file successful upload. | |
* @throws Exception | |
*/ | |
@Test | |
public void testAdd() throws Exception { | |
// add load | |
List<ListenableFuture<Integer>> futures = put(folder); | |
//start | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
assertFuture(futures, 0); | |
assertCacheStats(stagingCache, 0, 0, 1, 1); | |
} | |
/** | |
* Stage file unsuccessful upload. | |
* @throws Exception | |
*/ | |
@Test | |
public void testAddUploadException() throws Exception { | |
// Mock uploader to throw exception on write | |
final TestStagingUploader mockedDS = mock(TestStagingUploader.class); | |
doThrow(new DataStoreException("Error in writing blob")).when(mockedDS) | |
.write(Matchers.any(String.class), Matchers.any(File.class)); | |
// initialize staging cache using the mocked uploader | |
stagingCache = | |
UploadStagingCache.build(root, 1/*threads*/, 4 * 1024 /* bytes */, | |
mockedDS, null/*cache*/, statsProvider, executor, null, 3000); | |
closer.register(stagingCache); | |
// Add load | |
List<ListenableFuture<Integer>> futures = put(folder); | |
//start | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(futures); | |
// assert file retrieved from staging cache | |
File ret = stagingCache.getIfPresent(ID_PREFIX + 0); | |
assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()), ret)); | |
assertEquals(1, stagingCache.getStats().getLoadCount()); | |
assertEquals(1, stagingCache.getStats().getLoadSuccessCount()); | |
assertCacheStats(stagingCache, 1, 4 * 1024, 1, 1); | |
} | |
/** | |
* Retrieve without adding. | |
* @throws Exception | |
*/ | |
@Test | |
public void testGetNoAdd() throws Exception { | |
File ret = stagingCache.getIfPresent(ID_PREFIX + 0); | |
// assert no file | |
assertNull(ret); | |
assertEquals(1, stagingCache.getStats().getLoadCount()); | |
assertCacheStats(stagingCache, 0, 0, 0, 0); | |
} | |
/** | |
* GetAllIdentifiers without adding. | |
* @throws Exception | |
*/ | |
@Test | |
public void testGetAllIdentifiersNoAdd() throws Exception { | |
Iterator<String> ids = stagingCache.getAllIdentifiers(); | |
assertFalse(ids.hasNext()); | |
} | |
/** | |
* Invalidate without adding. | |
* @throws Exception | |
*/ | |
@Test | |
public void testInvalidateNoAdd() throws Exception { | |
stagingCache.invalidate(ID_PREFIX + 0); | |
assertCacheStats(stagingCache, 0, 0, 0, 0); | |
} | |
/** | |
* Error in putting file to stage. | |
* @throws Exception | |
*/ | |
@Test | |
public void testPutMoveFileError() throws Exception { | |
Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, new File("empty")); | |
// assert no file | |
assertFalse(future.isPresent()); | |
assertEquals(1, stagingCache.getStats().getMissCount()); | |
assertCacheStats(stagingCache, 0, 0, 0, 1); | |
} | |
/** | |
* Put and retrieve different files concurrently. | |
* @throws Exception | |
*/ | |
@Test | |
public void testGetAddDifferent() throws Exception { | |
//add load | |
List<ListenableFuture<Integer>> futures = put(folder); | |
// Create an async retrieve task | |
final SettableFuture<File> retFuture = SettableFuture.create(); | |
Thread t = new Thread(new Runnable() { | |
@Override public void run() { | |
retFuture.set(stagingCache.getIfPresent(ID_PREFIX + 1)); | |
} | |
}); | |
//start | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
t.start(); | |
//assert no file retrieve | |
assertNull(retFuture.get()); | |
assertEquals(1, stagingCache.getStats().getLoadCount()); | |
assertFuture(futures, 0); | |
assertCacheStats(stagingCache, 0, 0, 1, 1); | |
} | |
/** | |
* Stage file when cache full. | |
* @throws Exception | |
*/ | |
@Test | |
public void testCacheFullAdd() throws Exception { | |
// initialize cache to have restricted size | |
stagingCache = | |
UploadStagingCache.build(root, 1/*threads*/, 4 * 1024 /* bytes */, | |
uploader, null/*cache*/, statsProvider, executor, null, 3000); | |
closer.register(stagingCache); | |
// add load | |
List<ListenableFuture<Integer>> futures = put(folder); | |
// Add another load | |
File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); | |
Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2); | |
assertFalse(future2.isPresent()); | |
//start | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
assertFuture(futures, 0); | |
// Try 2nd upload again | |
Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 1, f2); | |
futures = Lists.newArrayList(); | |
if (future.isPresent()) { | |
futures.add(future.get()); | |
} | |
assertFuture(futures, 1); | |
assertCacheStats(stagingCache, 0, 0, 2, 3); | |
} | |
/** | |
* GetAllIdentifiers after staging before upload. | |
* @throws Exception | |
*/ | |
@Test | |
public void testGetAllIdentifiers() throws Exception { | |
// add load | |
List<ListenableFuture<Integer>> futures = put(folder); | |
// Check getAllIdentifiers | |
Iterator<String> idsIter = stagingCache.getAllIdentifiers(); | |
assertEquals(ID_PREFIX + 0, Iterators.getOnlyElement(idsIter)); | |
//start | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
assertFuture(futures, 0); | |
assertCacheStats(stagingCache, 0, 0, 1, 1); | |
// Should not return anything | |
idsIter = stagingCache.getAllIdentifiers(); | |
assertEquals(0, Iterators.size(idsIter)); | |
} | |
/** | |
* Invalidate after staging before upload. | |
* @throws Exception | |
*/ | |
@Test | |
public void testInvalidate() throws Exception { | |
// add load | |
List<ListenableFuture<Integer>> futures = put(folder); | |
// Check invalidate | |
stagingCache.invalidate(ID_PREFIX + 0); | |
File file = stagingCache.getIfPresent(ID_PREFIX + 0); | |
assertNull(file); | |
//start | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(futures); | |
assertCacheStats(stagingCache, 0, 0, 1, 1); | |
// Should not return anything | |
file = stagingCache.getIfPresent(ID_PREFIX + 0); | |
assertNull(file); | |
} | |
/** | |
* Stage same file concurrently. | |
* @throws Exception | |
*/ | |
@Test | |
public void testConcurrentSameAdd() throws Exception { | |
// Add load | |
List<ListenableFuture<Integer>> futures = put(folder); | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 0, f); | |
assertFalse(future2.isPresent()); | |
//start | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
assertFuture(futures, 0); | |
assertCacheStats(stagingCache, 0, 0, 1, 2); | |
} | |
/** | |
* Stage different files concurrently | |
* @throws Exception | |
*/ | |
@Test | |
public void testConcurrentDifferentAdd() throws Exception { | |
// Add load | |
List<ListenableFuture<Integer>> futures = put(folder); | |
// Add diff load | |
File f2 = copyToFile(randomStream(1, 4 * 1024), folder.newFile()); | |
Optional<SettableFuture<Integer>> future2 = stagingCache.put(ID_PREFIX + 1, f2); | |
if (future2.isPresent()) { | |
futures.add(future2.get()); | |
} | |
//start | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
assertFuture(futures, 0, 1); | |
assertCacheStats(stagingCache, 0, 0, 2, 2); | |
} | |
/** | |
* Concurrently retrieve after stage but before upload. | |
* @throws Exception | |
*/ | |
@Test | |
public void testConcurrentGetDelete() throws Exception { | |
ListeningExecutorService executorService = | |
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); | |
closer.register(new ExecutorCloser(executorService)); | |
// Add load | |
List<ListenableFuture<Integer>> futures = put(folder); | |
// Get a handle to the file and open stream | |
File file = stagingCache.getIfPresent(ID_PREFIX + 0); | |
final FileInputStream fStream = Files.newInputStreamSupplier(file).getInput(); | |
// task to copy the steam to a file simulating read from the stream | |
File temp = folder.newFile(); | |
CountDownLatch copyThreadLatch = new CountDownLatch(1); | |
SettableFuture<File> future1 = | |
copyStreamThread(executorService, fStream, temp, copyThreadLatch); | |
//start | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
waitFinish(futures); | |
// trying copying now | |
copyThreadLatch.countDown(); | |
future1.get(); | |
assertTrue(Files.equal(temp, uploader.read(ID_PREFIX + 0))); | |
} | |
/** | |
* Concurrently stage and trigger delete after upload for same file. | |
* @throws Exception | |
*/ | |
@Test | |
public void testConcurrentPutDeleteSame() throws Exception { | |
testConcurrentPutDelete(0); | |
} | |
/** | |
* Concurrently stage and trigger delete after upload for different file. | |
* @throws Exception | |
*/ | |
@Test | |
public void testConcurrentPutDeleteDifferent() throws Exception { | |
testConcurrentPutDelete(1); | |
} | |
private void testConcurrentPutDelete(int diff) throws Exception { | |
ListeningExecutorService executorService = | |
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(2)); | |
closer.register(new ExecutorCloser(executorService)); | |
//start immediately | |
taskLatch.countDown(); | |
// Add immediately | |
List<ListenableFuture<Integer>> futures = put(folder); | |
// New task to put another file | |
File f2 = copyToFile(randomStream(diff, 4 * 1024), folder.newFile()); | |
CountDownLatch putThreadLatch = new CountDownLatch(1); | |
CountDownLatch triggerLatch = new CountDownLatch(1); | |
SettableFuture<Optional<SettableFuture<Integer>>> future1 = | |
putThread(executorService, diff, f2, stagingCache, putThreadLatch, triggerLatch); | |
putThreadLatch.countDown(); | |
// wait for put thread to go ahead | |
callbackLatch.countDown(); | |
ScheduledFuture<?> scheduledFuture = | |
removeExecutor.schedule(stagingCache.new RemoveJob(), 0, TimeUnit.MILLISECONDS); | |
triggerLatch.await(); | |
if (future1.get().isPresent()) { | |
futures.add(future1.get().get()); | |
} | |
ListenableFuture<List<Integer>> listListenableFuture = Futures.successfulAsList(futures); | |
try { | |
listListenableFuture.get(); | |
scheduledFuture.get(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()), | |
uploader.read(ID_PREFIX + 0))); | |
assertTrue(Files.equal(copyToFile(randomStream(diff, 4 * 1024), folder.newFile()), | |
uploader.read(ID_PREFIX + diff))); | |
} | |
/** | |
* Test build on start. | |
* @throws Exception | |
*/ | |
@Test | |
public void testBuild() throws Exception { | |
// Add load | |
List<ListenableFuture<Integer>> futures = put(folder); | |
// Close before uploading finished | |
closer.close(); | |
// Start again | |
init(); | |
taskLatch.countDown(); | |
callbackLatch.countDown(); | |
afterExecuteLatch.await(); | |
waitFinish(futures); | |
assertNull(stagingCache.getIfPresent(ID_PREFIX + 0)); | |
assertTrue(Files.equal(copyToFile(randomStream(0, 4 * 1024), folder.newFile()), | |
uploader.read(ID_PREFIX + 0))); | |
assertCacheStats(stagingCache, 0, 0, 1, 1); | |
} | |
/** -------------------- Helper methods ----------------------------------------------------**/ | |
private static SettableFuture<File> copyStreamThread(ListeningExecutorService executor, | |
final InputStream fStream, final File temp, final CountDownLatch start) { | |
final SettableFuture<File> future = SettableFuture.create(); | |
executor.submit(new Runnable() { | |
@Override public void run() { | |
try { | |
LOG.info("Waiting for start of copying"); | |
start.await(); | |
LOG.info("Starting copy of [{}]", temp); | |
FileUtils.copyInputStreamToFile(fStream, temp); | |
LOG.info("Finished retrieve"); | |
future.set(temp); | |
} catch (Exception e) { | |
LOG.info("Exception in get", e); | |
} | |
} | |
}); | |
return future; | |
} | |
private static SettableFuture<Optional<SettableFuture<Integer>>> putThread( | |
ListeningExecutorService executor, final int seed, final File f, final UploadStagingCache cache, | |
final CountDownLatch start, final CountDownLatch trigger) { | |
final SettableFuture<Optional<SettableFuture<Integer>>> future = SettableFuture.create(); | |
executor.submit(new Runnable() { | |
@Override public void run() { | |
try { | |
LOG.info("Waiting for start to put"); | |
start.await(); | |
LOG.info("Starting put"); | |
trigger.countDown(); | |
Optional<SettableFuture<Integer>> opt = cache.put(ID_PREFIX + seed, f); | |
LOG.info("Finished put"); | |
future.set(opt); | |
} catch (Exception e) { | |
LOG.info("Exception in get", e); | |
} | |
} | |
}); | |
return future; | |
} | |
private void waitFinish(List<ListenableFuture<Integer>> futures) { | |
ListenableFuture<List<Integer>> listListenableFuture = Futures.successfulAsList(futures); | |
try { | |
listListenableFuture.get(); | |
ScheduledFuture<?> scheduledFuture = | |
removeExecutor.schedule(stagingCache.new RemoveJob(), 0, TimeUnit.MILLISECONDS); | |
scheduledFuture.get(); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
private List<ListenableFuture<Integer>> put(TemporaryFolder folder) | |
throws IOException { | |
File f = copyToFile(randomStream(0, 4 * 1024), folder.newFile()); | |
Optional<SettableFuture<Integer>> future = stagingCache.put(ID_PREFIX + 0, f); | |
List<ListenableFuture<Integer>> futures = Lists.newArrayList(); | |
if (future.isPresent()) { | |
futures.add(future.get()); | |
} | |
return futures; | |
} | |
private void assertFuture(List<ListenableFuture<Integer>> futures, int... seeds) | |
throws Exception { | |
waitFinish(futures); | |
for (int i = 0; i < seeds.length; i++) { | |
File upload = uploader.read(ID_PREFIX + seeds[i]); | |
assertFile(upload, seeds[i], folder); | |
} | |
} | |
private void assertFile(File f, int seed, TemporaryFolder folder) throws IOException { | |
assertTrue(f.exists()); | |
File temp = copyToFile(randomStream(seed, 4 * 1024), folder.newFile()); | |
assertTrue("Uploaded file content differs", FileUtils.contentEquals(temp, f)); | |
} | |
private static void assertCacheStats(UploadStagingCache cache, long elems, long weight, | |
long hits, long count) { | |
assertEquals(elems, cache.getStats().getElementCount()); | |
assertEquals(weight, cache.getStats().estimateCurrentWeight()); | |
assertEquals(hits, cache.getStats().getHitCount()); | |
assertEquals(count, cache.getStats().getRequestCount()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment