Last active
July 18, 2016 15:46
-
-
Save Donnerbart/797d38b956ad6e72a9c293c935400600 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2008-2016, Hazelcast, Inc. All Rights Reserved. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package com.hazelcast.map; | |
import com.hazelcast.config.Config; | |
import com.hazelcast.config.MapConfig; | |
import com.hazelcast.config.MapIndexConfig; | |
import com.hazelcast.core.HazelcastInstance; | |
import com.hazelcast.core.IMap; | |
import com.hazelcast.nio.ObjectDataInput; | |
import com.hazelcast.nio.ObjectDataOutput; | |
import com.hazelcast.nio.serialization.DataSerializable; | |
import com.hazelcast.query.EntryObject; | |
import com.hazelcast.query.Predicate; | |
import com.hazelcast.query.PredicateBuilder; | |
import com.hazelcast.test.AssertTask; | |
import com.hazelcast.test.HazelcastSerialClassRunner; | |
import com.hazelcast.test.HazelcastTestSupport; | |
import com.hazelcast.test.TestHazelcastInstanceFactory; | |
import com.hazelcast.test.annotation.SlowTest; | |
import org.junit.After; | |
import org.junit.Before; | |
import org.junit.Test; | |
import org.junit.experimental.categories.Category; | |
import org.junit.runner.RunWith; | |
import java.io.IOException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import static org.junit.Assert.assertEquals; | |
import static org.junit.Assert.assertTrue; | |
/** | |
* Creates a map that is used to test data consistency while nodes are joining and leaving the cluster. | |
* | |
* The basic idea is pretty simple. We'll add a number to a list for each key in the IMap. This allows us to verify whether | |
* the numbers are added in the correct order and also whether there's any data loss as nodes leave or join the cluster. | |
*/ | |
@RunWith(HazelcastSerialClassRunner.class) | |
@Category(SlowTest.class) | |
public class EntryProcessorBouncingNodesTest extends HazelcastTestSupport { | |
private static final int ENTRIES = 10; | |
private static final int ITERATIONS = 50; | |
private static final String MAP_NAME = "entryProcessorBouncingNodesTestMap"; | |
private TestHazelcastInstanceFactory instanceFactory; | |
@Before | |
public void setUp() { | |
instanceFactory = new TestHazelcastInstanceFactory(500); | |
} | |
@After | |
public void tearDown() { | |
instanceFactory.shutdownAll(); | |
} | |
/** | |
* Tests {@link com.hazelcast.map.impl.operation.EntryOperation}. | |
*/ | |
@Test | |
public void testEntryProcessorWhileTwoNodesAreBouncing() { | |
testEntryProcessorWhileTwoNodesAreBouncing(false); | |
} | |
/** | |
* Tests {@link com.hazelcast.map.impl.operation.MultipleEntryWithPredicateOperation}. | |
*/ | |
@Test | |
public void testEntryProcessorWhileTwoNodesAreBouncing_withPredicate() { | |
testEntryProcessorWhileTwoNodesAreBouncing(true); | |
} | |
private void testEntryProcessorWhileTwoNodesAreBouncing(boolean withPredicate) { | |
CountDownLatch startLatch = new CountDownLatch(1); | |
AtomicBoolean isRunning = new AtomicBoolean(true); | |
// start up three instances | |
HazelcastInstance instance = newInstance(withPredicate); | |
HazelcastInstance instance2 = newInstance(withPredicate); | |
HazelcastInstance instance3 = newInstance(withPredicate); | |
final IMap<Integer, ListHolder> map = instance.getMap(MAP_NAME); | |
final ListHolder expected = new ListHolder(); | |
// initialize the list synchronously to ensure the map is correctly initialized | |
InitMapProcessor initProcessor = new InitMapProcessor(); | |
for (int i = 0; i < ENTRIES; ++i) { | |
map.executeOnKey(i, initProcessor); | |
} | |
assertEquals(ENTRIES, map.size()); | |
// spin up the thread that stops/starts the instance2 and instance3, always keeping one instance running | |
Runnable runnable = new TwoNodesRestartingRunnable(startLatch, isRunning, withPredicate, instance2, instance3); | |
Thread bounceThread = new Thread(runnable); | |
bounceThread.start(); | |
// now, with nodes joining and leaving the cluster concurrently, start adding numbers to the lists | |
int iteration = 0; | |
while (iteration < ITERATIONS) { | |
if (iteration == 30) { | |
// let the bounce threads start bouncing | |
startLatch.countDown(); | |
} | |
IncrementProcessor processor = new IncrementProcessor(iteration); | |
expected.add(iteration); | |
for (int i = 0; i < ENTRIES; ++i) { | |
if (withPredicate) { | |
EntryObject eo = new PredicateBuilder().getEntryObject(); | |
Predicate keyPredicate = eo.key().equal(i); | |
map.executeOnEntries(processor, keyPredicate); | |
} else { | |
map.executeOnKey(i, processor); | |
} | |
} | |
// give processing time to catch up | |
++iteration; | |
} | |
// signal the bounce threads that we're done | |
isRunning.set(false); | |
// wait for the instance bounces to complete | |
assertJoinable(bounceThread); | |
final CountDownLatch latch = new CountDownLatch(ENTRIES); | |
for (int i = 0; i < ENTRIES; ++i) { | |
final int id = i; | |
new Thread(new Runnable() { | |
@Override | |
public void run() { | |
assertTrueEventually(new AssertTask() { | |
@Override | |
public void run() throws Exception { | |
assertTrue(expected.size() <= map.get(id).size()); | |
} | |
}); | |
latch.countDown(); | |
} | |
}).start(); | |
} | |
assertOpenEventually(latch); | |
if (withPredicate) { | |
// TODO: check why this is failing without predicates | |
for (int index = 0; index < ENTRIES; ++index) { | |
ListHolder holder = map.get(index); | |
assertEquals("The ListHolder should contain ITERATIONS entries", ITERATIONS, holder.size()); | |
for (int i = 0; i < ITERATIONS; i++) { | |
assertEquals(i, holder.get(i)); | |
} | |
} | |
} | |
} | |
private HazelcastInstance newInstance(boolean withPredicate) { | |
MapConfig mapConfig = new MapConfig(MAP_NAME); | |
mapConfig.setBackupCount(2); | |
if (withPredicate) { | |
mapConfig.addMapIndexConfig(new MapIndexConfig("__key", true)); | |
} | |
Config config = new Config(); | |
config.addMapConfig(mapConfig); | |
return instanceFactory.newHazelcastInstance(config); | |
} | |
private class TwoNodesRestartingRunnable implements Runnable { | |
private final CountDownLatch start; | |
private final AtomicBoolean isRunning; | |
private final boolean withPredicate; | |
private HazelcastInstance instance1; | |
private HazelcastInstance instance2; | |
private TwoNodesRestartingRunnable(CountDownLatch startLatch, AtomicBoolean isRunning, boolean withPredicate, | |
HazelcastInstance h1, HazelcastInstance h2) { | |
this.start = startLatch; | |
this.isRunning = isRunning; | |
this.withPredicate = withPredicate; | |
this.instance1 = h1; | |
this.instance2 = h2; | |
} | |
@Override | |
public void run() { | |
try { | |
start.await(); | |
while (isRunning.get()) { | |
instance1.shutdown(); | |
instance2.shutdown(); | |
sleepMillis(10); | |
instance1 = newInstance(withPredicate); | |
instance2 = newInstance(withPredicate); | |
} | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
private static class InitMapProcessor extends AbstractEntryProcessor<Integer, ListHolder> { | |
@Override | |
public Object process(Map.Entry<Integer, ListHolder> entry) { | |
entry.setValue(new ListHolder()); | |
return null; | |
} | |
} | |
private static class IncrementProcessor extends AbstractEntryProcessor<Integer, ListHolder> { | |
private final int nextVal; | |
private IncrementProcessor(int nextVal) { | |
this.nextVal = nextVal; | |
} | |
@Override | |
public Object process(Map.Entry<Integer, ListHolder> entry) { | |
ListHolder holder = entry.getValue(); | |
if (holder == null) { | |
holder = new ListHolder(); | |
} | |
holder.add(nextVal); | |
entry.setValue(holder); | |
return null; | |
} | |
} | |
private static class ListHolder implements DataSerializable { | |
private List<Integer> list = new ArrayList<Integer>(); | |
private int size; | |
public ListHolder() { | |
} | |
@Override | |
public void writeData(ObjectDataOutput out) throws IOException { | |
out.writeInt(list.size()); | |
for (Integer value : list) { | |
out.writeInt(value); | |
} | |
} | |
@Override | |
public void readData(ObjectDataInput in) throws IOException { | |
size = in.readInt(); | |
list = new ArrayList<Integer>(size); | |
for (int i = 0; i < size; i++) { | |
list.add(in.readInt()); | |
} | |
} | |
public void add(int value) { | |
list.add(value); | |
size++; | |
} | |
public int get(int index) { | |
return list.get(index); | |
} | |
public int size() { | |
return size; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment