Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save krosenvold/fa20521ad121a0cbb4c6ed6be91452e5 to your computer and use it in GitHub Desktop.
Save krosenvold/fa20521ad121a0cbb4c6ed6be91452e5 to your computer and use it in GitHub Desktop.
Concurrency problem with loadCache
import java.io.IOException;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.cache.Cache;
import javax.cache.configuration.Factory;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.plugin.segmentation.SegmentationPolicy;
public class IgniteTester {
private static int CNT = 1_500_000;
public static void main(String[] args) throws InterruptedException, IOException, IgniteCheckedException {
Ignite ignite = IgnitionEx.start(baseConfigNoDiscovery("lonely-testGrid"));
final IgniteCache<Integer, String> testCache = ignite.getOrCreateCache(cacheConfig("testCache"));
testCache.rebalance().get(); // because ignite 1.5 does not start cache properly replicated
populateOrCheck(testCache);
populateOrCheck(ignite.getOrCreateCache(cacheConfig("testCache2")));
populateOrCheck(ignite.getOrCreateCache(cacheConfig("testCache3")));
populateOrCheck(ignite.getOrCreateCache(cacheConfig("testCache4")));
populateOrCheck(ignite.getOrCreateCache(cacheConfig("testCache5")));
populateOrCheck(ignite.getOrCreateCache(cacheConfig("testCache6")));
}
private static void populateOrCheck(IgniteCache<Integer, String> testCache) throws InterruptedException {
if (testCache.localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP) == 0) {
System.out.print("Cache is empty, loading...");
testCache.loadCache( (k,v) -> true);
System.out.println("Data loaded.");
} else {
int currentSize;
while ((currentSize = testCache.localSize(CachePeekMode.PRIMARY, CachePeekMode.BACKUP)) < CNT) {
System.out.println("currentSize = " + currentSize + " waiting for cache to get proper size");
Thread.sleep(1000);
}
for (int i = 0; i < CNT; i++){
final String s = testCache.get(i);
if (!s.equals( genString(i))){
throw new IllegalStateException("No good state");
}
}
System.out.println("We seem to get correct data");
}
}
private static String genString(int i) {
StringBuilder sb = new StringBuilder();
/*for( int j = 0; j < (i/10000); j++){
sb.append(Integer.toString(j));
} */
sb.append("_");
sb.append(Integer.toString(i));
return sb.toString();
}
public static CacheConfiguration<Integer, String> cacheConfig(String testCache) {
CacheConfiguration<Integer, String> config = new CacheConfiguration<>();
config.setName(testCache);
config.setCacheMode(CacheMode.REPLICATED);
config.setRebalanceMode(CacheRebalanceMode.SYNC);
config.setCacheStoreFactory(new TestFactory());
return config;
}
public static IgniteConfiguration baseConfigNoDiscovery(String gridName) throws IOException {
IgniteConfiguration igniteConfiguration = new IgniteConfiguration();
igniteConfiguration.setFailureDetectionTimeout(30_000);
igniteConfiguration.setWorkDirectory(java.nio.file.Files.createTempDirectory("tc").toFile().getAbsolutePath());
igniteConfiguration.setGridName(gridName);
igniteConfiguration.setSegmentationPolicy(SegmentationPolicy.NOOP);
igniteConfiguration.setMetricsLogFrequency(0);
return igniteConfiguration;
}
public static class TestFactory implements Factory<CacheStore<? super Object, ? super Object>>, Serializable {
@Override public CacheStore<? super Object, ? super Object> create() {
return new TestCacheStore();
}
}
static CountDownLatch globalLatch = new CountDownLatch(1);
public static class TestCacheStore<Integer, String> implements CacheStore<Integer, String>, Serializable {
@Override public void loadCache(IgniteBiInClosure clo, Object... args) throws CacheLoaderException {
for (int i = 0; i < CNT; i++){
if (i == 100_000) globalLatch.countDown();
clo.apply(i, genString(i));
}
}
@Override public void sessionEnd(boolean commit) throws CacheWriterException {
}
@Override public Object load(Object o) throws CacheLoaderException {
return null;
}
@Override public Map loadAll(Iterable iterable) throws CacheLoaderException {
return null;
}
@Override public void write(Cache.Entry entry) throws CacheWriterException {
}
@Override public void writeAll(Collection collection) throws CacheWriterException {
}
@Override public void delete(Object o) throws CacheWriterException {
}
@Override public void deleteAll(Collection collection) throws CacheWriterException {
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment