Created
August 17, 2013 22:17
-
-
Save gustavoanatoly/6258956 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java | |
index c1723e2..6488b32 100644 | |
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java | |
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java | |
@@ -18,33 +18,41 @@ | |
package org.apache.hadoop.hbase; | |
-import java.io.IOException; | |
-import java.util.ArrayList; | |
-import java.util.Collection; | |
-import java.util.List; | |
+import static org.junit.Assert.assertFalse; | |
+import static org.junit.Assert.assertNotEquals; | |
+import static org.junit.Assert.assertTrue; | |
-import junit.framework.Assert; | |
+import java.util.ArrayList; | |
+import java.util.HashMap; | |
+import java.util.HashSet; | |
+import java.util.List; | |
+import java.util.Map; | |
+import java.util.Map.Entry; | |
+import java.util.Set; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
+import org.apache.hadoop.conf.Configuration; | |
+import org.apache.hadoop.hbase.catalog.CatalogTracker; | |
+import org.apache.hadoop.hbase.executor.EventType; | |
+import org.apache.hadoop.hbase.executor.ExecutorService; | |
+import org.apache.hadoop.hbase.executor.ExecutorType; | |
+import org.apache.hadoop.hbase.master.AssignmentManager; | |
import org.apache.hadoop.hbase.master.HMaster; | |
-import org.apache.hadoop.hbase.protobuf.RequestConverter; | |
+import org.apache.hadoop.hbase.master.LoadBalancer; | |
+import org.apache.hadoop.hbase.master.RegionPlan; | |
+import org.apache.hadoop.hbase.master.RegionState; | |
import org.apache.hadoop.hbase.master.ServerManager; | |
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil; | |
-import org.apache.hadoop.hbase.regionserver.HRegion; | |
-import org.apache.hadoop.hbase.regionserver.HRegionServer; | |
-import org.apache.hadoop.hbase.util.Bytes; | |
-import org.apache.hadoop.hbase.util.JVMClusterUtil; | |
-import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; | |
-import org.apache.hadoop.hbase.util.Threads; | |
+import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; | |
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState; | |
import org.apache.hadoop.hbase.zookeeper.ZKAssign; | |
-import org.apache.hadoop.hbase.zookeeper.ZKUtil; | |
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; | |
import org.apache.zookeeper.KeeperException; | |
import org.junit.AfterClass; | |
import org.junit.BeforeClass; | |
import org.junit.Test; | |
import org.junit.experimental.categories.Category; | |
+import org.mockito.Mockito; | |
/** | |
@@ -54,258 +62,255 @@ | |
* master does no assign regions to a regionserver marked as drained. | |
* | |
* @see <a href="https://issues.apache.org/jira/browse/HBASE-4298">HBASE-4298</a> | |
+ * | |
+ * | |
+ * TestDrainingServer is an integration test. It should be a unit test instead | |
+ * @see <a href="https://issues.apache.org/jira/browse/HBASE-7462">HBASE-7462</a> | |
*/ | |
@Category(MediumTests.class) | |
public class TestDrainingServer { | |
private static final Log LOG = LogFactory.getLog(TestDrainingServer.class); | |
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); | |
- private static final int NB_SLAVES = 5; | |
- private static final int COUNT_OF_REGIONS = NB_SLAVES * 2; | |
- | |
- /** | |
- * Spin up a cluster with a bunch of regions on it. | |
- */ | |
- @BeforeClass | |
- public static void setUpBeforeClass() throws Exception { | |
- TEST_UTIL.startMiniCluster(NB_SLAVES); | |
- TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); | |
- TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true); | |
- | |
- final List<String> families = new ArrayList<String>(1); | |
- families.add("family"); | |
- TEST_UTIL.createRandomTable("table", families, 1, 0, 0, COUNT_OF_REGIONS, 0); | |
- // Ensure a stable env | |
- TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); | |
- | |
- boolean ready = false; | |
- while (!ready){ | |
- waitForAllRegionsOnline(); | |
- | |
- // Assert that every regionserver has some regions on it. | |
- int i = 0; | |
- ready = true; | |
- while (i < NB_SLAVES && ready){ | |
- HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i); | |
- if (ProtobufUtil.getOnlineRegions(hrs).isEmpty()){ | |
- ready = false; | |
- } | |
- i++; | |
- } | |
- | |
- if (!ready){ | |
- TEST_UTIL.getHBaseAdmin().setBalancerRunning(true, true); | |
- Assert.assertTrue("Can't start a balance!", TEST_UTIL.getHBaseAdmin().balancer()); | |
- TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false); | |
- Thread.sleep(100); | |
- } | |
+ private Abortable abortable = new Abortable() { | |
+ @Override | |
+ public boolean isAborted() { | |
+ return false; | |
} | |
- } | |
- private static HRegionServer setDrainingServer(final HRegionServer hrs) | |
- throws KeeperException { | |
- LOG.info("Making " + hrs.getServerName() + " the draining server; " + | |
- "it has " + hrs.getNumberOfOnlineRegions() + " online regions"); | |
- ZooKeeperWatcher zkw = hrs.getZooKeeper(); | |
- String hrsDrainingZnode = | |
- ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString()); | |
- ZKUtil.createWithParents(zkw, hrsDrainingZnode); | |
- return hrs; | |
- } | |
+ @Override | |
+ public void abort(String why, Throwable e) { | |
- private static HRegionServer unsetDrainingServer(final HRegionServer hrs) | |
- throws KeeperException { | |
- ZooKeeperWatcher zkw = hrs.getZooKeeper(); | |
- String hrsDrainingZnode = | |
- ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString()); | |
- ZKUtil.deleteNode(zkw, hrsDrainingZnode); | |
- return hrs; | |
- } | |
+ } | |
+ }; | |
@AfterClass | |
- public static void tearDownAfterClass() throws Exception { | |
- TEST_UTIL.shutdownMiniCluster(); | |
+ public static void afterClass() throws Exception { | |
+ TEST_UTIL.shutdownMiniZKCluster(); | |
+ } | |
+ | |
+ @BeforeClass | |
+ public static void beforeClass() throws Exception { | |
+ TEST_UTIL.startMiniZKCluster(); | |
} | |
- /** | |
- * Test adding server to draining servers and then move regions off it. | |
- * Make sure that no regions are moved back to the draining server. | |
- * @throws IOException | |
- * @throws KeeperException | |
- */ | |
- @Test // (timeout=30000) | |
- public void testDrainingServerOffloading() | |
- throws Exception { | |
- // I need master in the below. | |
- HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster(); | |
- HRegionInfo hriToMoveBack = null; | |
- // Set first server as draining server. | |
- HRegionServer drainingServer = | |
- setDrainingServer(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0)); | |
- try { | |
- final int regionsOnDrainingServer = | |
- drainingServer.getNumberOfOnlineRegions(); | |
- Assert.assertTrue(regionsOnDrainingServer > 0); | |
- List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(drainingServer); | |
- for (HRegionInfo hri : hris) { | |
- // Pass null and AssignmentManager will chose a random server BUT it | |
- // should exclude draining servers. | |
- master.moveRegion(null, | |
- RequestConverter.buildMoveRegionRequest(hri.getEncodedNameAsBytes(), null)); | |
- // Save off region to move back. | |
- hriToMoveBack = hri; | |
- } | |
- // Wait for regions to come back on line again. | |
- waitForAllRegionsOnline(); | |
- Assert.assertEquals(0, drainingServer.getNumberOfOnlineRegions()); | |
- } finally { | |
- unsetDrainingServer(drainingServer); | |
+ @Test | |
+ public void testAssignmentManagerDoesntUseDrainingServer() throws Exception { | |
+ AssignmentManager am = null; | |
+ Configuration conf = TEST_UTIL.getConfiguration(); | |
+ final HMaster master = Mockito.mock(HMaster.class); | |
+ final Server server = Mockito.mock(Server.class); | |
+ final ServerManager serverManager = Mockito.mock(ServerManager.class); | |
+ final ServerName SERVERNAME_A = new ServerName("mockserver_a.org", 1000, 8000); | |
+ final ServerName SERVERNAME_B = new ServerName("mockserver_b.org", 1001, 8000); | |
+ LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf); | |
+ CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class); | |
+ final HRegionInfo REGIONINFO = new HRegionInfo(TableName.valueOf("table_test"), | |
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
+ | |
+ List<ServerName> favored = new ArrayList<ServerName>(); | |
+ favored.add(SERVERNAME_A); | |
+ favored.add(SERVERNAME_B); | |
+ | |
+ ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), | |
+ "zkWatcher-Test", abortable, true); | |
+ | |
+ Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>(); | |
+ | |
+ onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD); | |
+ onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD); | |
+ | |
+ | |
+ Mockito.when(server.getConfiguration()).thenReturn(conf); | |
+ Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1")); | |
+ Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher); | |
+ | |
+ Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers); | |
+ Mockito.when(serverManager.getOnlineServersList()) | |
+ .thenReturn(new ArrayList<ServerName>(onlineServers.keySet())); | |
+ | |
+ Mockito.when(serverManager.createDestinationServersList()) | |
+ .thenReturn(new ArrayList<ServerName>(onlineServers.keySet())); | |
+ Mockito.when(serverManager.createDestinationServersList(null)) | |
+ .thenReturn(new ArrayList<ServerName>(onlineServers.keySet())); | |
+ | |
+ for (ServerName sn : onlineServers.keySet()) { | |
+ Mockito.when(serverManager.isServerOnline(sn)).thenReturn(true); | |
+ Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1)).thenReturn(true); | |
+ Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1, null, false)).thenReturn(true); | |
+ Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, new ArrayList<ServerName>())) | |
+ .thenReturn(RegionOpeningState.OPENED); | |
+ Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, null)) | |
+ .thenReturn(RegionOpeningState.OPENED); | |
+ Mockito.when(serverManager.addServerToDrainList(sn)).thenReturn(true); | |
} | |
- // Now we've unset the draining server, we should be able to move a region | |
- // to what was the draining server. | |
- master.moveRegion(null, | |
- RequestConverter.buildMoveRegionRequest(hriToMoveBack.getEncodedNameAsBytes(), | |
- Bytes.toBytes(drainingServer.getServerName().toString()))); | |
- // Wait for regions to come back on line again. | |
- waitForAllRegionsOnline(); | |
- Assert.assertEquals(1, drainingServer.getNumberOfOnlineRegions()); | |
+ | |
+ Mockito.when(master.getServerManager()).thenReturn(serverManager); | |
+ | |
+ am = new AssignmentManager(server, serverManager, catalogTracker, | |
+ balancer, startupMasterExecutor("mockExecutorService"), null, null); | |
+ | |
+ Mockito.when(master.getAssignmentManager()).thenReturn(am); | |
+ Mockito.when(master.getZooKeeperWatcher()).thenReturn(zkWatcher); | |
+ Mockito.when(master.getZooKeeper()).thenReturn(zkWatcher); | |
+ | |
+ am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_A)); | |
+ am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_B)); | |
+ | |
+ zkWatcher.registerListenerFirst(am); | |
+ | |
+ addServerToDrainedList(SERVERNAME_A, onlineServers, serverManager); | |
+ | |
+ | |
+ am.assign(REGIONINFO, true); | |
+ | |
+ setRegionOpenedOnZK(zkWatcher, SERVERNAME_A, REGIONINFO); | |
+ setRegionOpenedOnZK(zkWatcher, SERVERNAME_B, REGIONINFO); | |
+ | |
+ | |
+ am.waitForAssignment(REGIONINFO); | |
+ | |
+ | |
+ assertTrue(am.getRegionStates().isRegionAssigned(REGIONINFO)); | |
+ assertNotEquals(am.getRegionStates().getRegionServerOfRegion(REGIONINFO), SERVERNAME_A); | |
} | |
- /** | |
- * Test that draining servers are ignored even after killing regionserver(s). | |
- * Verify that the draining server is not given any of the dead servers regions. | |
- * @throws KeeperException | |
- * @throws IOException | |
- */ | |
- @Test (timeout=30000) | |
- public void testDrainingServerWithAbort() throws KeeperException, Exception { | |
- HMaster master = TEST_UTIL.getHBaseCluster().getMaster(); | |
+ @Test | |
+ public void testAssignmentManagerDoesntUseDrainedServerWithBulkAssign() throws Exception { | |
+ Configuration conf = TEST_UTIL.getConfiguration(); | |
+ LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf); | |
+ CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class); | |
+ AssignmentManager am = null; | |
+ final HMaster master = Mockito.mock(HMaster.class); | |
+ final Server server = Mockito.mock(Server.class); | |
+ final ServerManager serverManager = Mockito.mock(ServerManager.class); | |
+ final ServerName SERVERNAME_A = new ServerName("mockserverbulk_a.org", 1000, 8000); | |
+ final ServerName SERVERNAME_B = new ServerName("mockserverbulk_b.org", 1001, 8000); | |
+ final ServerName SERVERNAME_C = new ServerName("mockserverbulk_c.org", 1002, 8000); | |
+ final ServerName SERVERNAME_D = new ServerName("mockserverbulk_d.org", 1003, 8000); | |
+ final ServerName SERVERNAME_E = new ServerName("mockserverbulk_e.org", 1004, 8000); | |
+ final Map<HRegionInfo, ServerName> bulk = new HashMap<HRegionInfo, ServerName>(); | |
- waitForAllRegionsOnline(); | |
+ Set<ServerName> bunchServersAssigned = new HashSet<ServerName>(); | |
+ | |
+ HRegionInfo REGIONINFO_A = new HRegionInfo(TableName.valueOf("table_A"), | |
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
+ HRegionInfo REGIONINFO_B = new HRegionInfo(TableName.valueOf("table_B"), | |
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
+ HRegionInfo REGIONINFO_C = new HRegionInfo(TableName.valueOf("table_C"), | |
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
+ HRegionInfo REGIONINFO_D = new HRegionInfo(TableName.valueOf("table_D"), | |
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
+ HRegionInfo REGIONINFO_E = new HRegionInfo(TableName.valueOf("table_E"), | |
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
- final long regionCount = TEST_UTIL.getMiniHBaseCluster().countServedRegions(); | |
+ Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>(); | |
+ List<ServerName> drainedServers = new ArrayList<ServerName>(); | |
- // Let's get a copy of the regions today. | |
- Collection<HRegion> regions = getRegions(); | |
- LOG.info("All regions: " + regions); | |
+ onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD); | |
+ onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD); | |
+ onlineServers.put(SERVERNAME_C, ServerLoad.EMPTY_SERVERLOAD); | |
+ onlineServers.put(SERVERNAME_D, ServerLoad.EMPTY_SERVERLOAD); | |
+ onlineServers.put(SERVERNAME_E, ServerLoad.EMPTY_SERVERLOAD); | |
- // Choose the draining server | |
- HRegionServer drainingServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0); | |
- final int regionsOnDrainingServer = drainingServer.getNumberOfOnlineRegions(); | |
- Assert.assertTrue(regionsOnDrainingServer > 0); | |
+ bulk.put(REGIONINFO_A, SERVERNAME_A); | |
+ bulk.put(REGIONINFO_B, SERVERNAME_B); | |
+ bulk.put(REGIONINFO_C, SERVERNAME_C); | |
+ bulk.put(REGIONINFO_D, SERVERNAME_D); | |
+ bulk.put(REGIONINFO_E, SERVERNAME_E); | |
- ServerManager sm = master.getServerManager(); | |
+ ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), | |
+ "zkWatcher-BulkAssignTest", abortable, true); | |
- Collection<HRegion> regionsBefore = drainingServer.getOnlineRegionsLocalContext(); | |
- LOG.info("Regions of drained server are: "+ regionsBefore ); | |
+ Mockito.when(server.getConfiguration()).thenReturn(conf); | |
+ Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1")); | |
+ Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher); | |
- try { | |
- // Add first server to draining servers up in zk. | |
- setDrainingServer(drainingServer); | |
+ Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers); | |
+ Mockito.when(serverManager.getOnlineServersList()).thenReturn( | |
+ new ArrayList<ServerName>(onlineServers.keySet())); | |
+ | |
+ Mockito.when(serverManager.createDestinationServersList()).thenReturn( | |
+ new ArrayList<ServerName>(onlineServers.keySet())); | |
+ Mockito.when(serverManager.createDestinationServersList(null)).thenReturn( | |
+ new ArrayList<ServerName>(onlineServers.keySet())); | |
+ | |
+ for (Entry<HRegionInfo, ServerName> entry : bulk.entrySet()) { | |
+ Mockito.when(serverManager.isServerOnline(entry.getValue())).thenReturn(true); | |
+ Mockito.when(serverManager.sendRegionClose(entry.getValue(), | |
+ entry.getKey(), -1)).thenReturn(true); | |
+ Mockito.when(serverManager.sendRegionOpen(entry.getValue(), | |
+ entry.getKey(), -1, null)).thenReturn(RegionOpeningState.OPENED); | |
+ Mockito.when(serverManager.addServerToDrainList(entry.getValue())).thenReturn(true); | |
+ } | |
+ | |
+ Mockito.when(master.getServerManager()).thenReturn(serverManager); | |
- //wait for the master to receive and manage the event | |
- while (sm.createDestinationServersList().contains(drainingServer.getServerName())) { | |
- Thread.sleep(1); | |
- } | |
+ drainedServers.add(SERVERNAME_A); | |
+ drainedServers.add(SERVERNAME_B); | |
+ drainedServers.add(SERVERNAME_C); | |
+ drainedServers.add(SERVERNAME_D); | |
- LOG.info("The available servers are: "+ sm.createDestinationServersList()); | |
+ am = new AssignmentManager(server, serverManager, catalogTracker, | |
+ balancer, startupMasterExecutor("mockExecutorServiceBulk"), null, null); | |
+ | |
+ Mockito.when(master.getAssignmentManager()).thenReturn(am); | |
- Assert.assertEquals("Nothing should have happened here.", regionsOnDrainingServer, | |
- drainingServer.getNumberOfOnlineRegions()); | |
- Assert.assertFalse("We should not have regions in transition here. List is: " + | |
- master.getAssignmentManager().getRegionStates().getRegionsInTransition(), | |
- master.getAssignmentManager().getRegionStates().isRegionsInTransition()); | |
+ zkWatcher.registerListener(am); | |
+ | |
+ for (ServerName drained : drainedServers) { | |
+ addServerToDrainedList(drained, onlineServers, serverManager); | |
+ } | |
+ | |
+ am.assign(bulk); | |
- // Kill a few regionservers. | |
- for (int aborted = 0; aborted <= 2; aborted++) { | |
- HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(aborted + 1); | |
- hrs.abort("Aborting"); | |
- } | |
- | |
- // Wait for regions to come back online again. waitForAllRegionsOnline can come back before | |
- // we've assigned out regions on the cluster so retry if we are shy the wanted number | |
- Collection<HRegion> regionsAfter = null; | |
- for (int i = 0; i < 1000; i++) { | |
- waitForAllRegionsOnline(); | |
- regionsAfter = getRegions(); | |
- if (regionsAfter.size() >= regionCount) break; | |
- LOG.info("Expecting " + regionCount + " but only " + regionsAfter); | |
- Threads.sleep(10); | |
- } | |
- LOG.info("Regions of drained server: " + regionsAfter + ", all regions: " + getRegions()); | |
- Assert.assertEquals("Test conditions are not met: regions were" + | |
- " created/deleted during the test. ", | |
- regionCount, TEST_UTIL.getMiniHBaseCluster().countServedRegions()); | |
- | |
- // Assert the draining server still has the same regions. | |
- regionsAfter = drainingServer.getOnlineRegionsLocalContext(); | |
- StringBuilder result = new StringBuilder(); | |
- for (HRegion r: regionsAfter){ | |
- if (!regionsBefore.contains(r)){ | |
- result.append(r).append(" was added after the drain"); | |
- if (regions.contains(r)){ | |
- result.append("(existing region"); | |
- } else { | |
- result.append("(new region)"); | |
- } | |
- result.append("; "); | |
- } | |
- } | |
- for (HRegion r: regionsBefore){ | |
- if (!regionsAfter.contains(r)){ | |
- result.append(r).append(" was removed after the drain; "); | |
- } | |
- } | |
- Assert.assertTrue("Errors are: "+ result.toString(), result.length()==0); | |
- | |
- } finally { | |
- unsetDrainingServer(drainingServer); | |
+ Map<String, RegionState> regionsInTransition = am.getRegionStates().getRegionsInTransition(); | |
+ for (Entry<String, RegionState> entry : regionsInTransition.entrySet()) { | |
+ setRegionOpenedOnZK(zkWatcher, entry.getValue().getServerName(), | |
+ entry.getValue().getRegion()); | |
+ } | |
+ | |
+ am.waitForAssignment(REGIONINFO_A); | |
+ am.waitForAssignment(REGIONINFO_B); | |
+ am.waitForAssignment(REGIONINFO_C); | |
+ am.waitForAssignment(REGIONINFO_D); | |
+ am.waitForAssignment(REGIONINFO_E); | |
+ | |
+ Map<HRegionInfo, ServerName> regionAssignments = am.getRegionStates().getRegionAssignments(); | |
+ for (Entry<HRegionInfo, ServerName> entry : regionAssignments.entrySet()) { | |
+ LOG.info("Region Assignment: " | |
+ + entry.getKey().getRegionNameAsString() + " Server: " + entry.getValue()); | |
+ bunchServersAssigned.add(entry.getValue()); | |
+ } | |
+ | |
+ for (ServerName sn : drainedServers) { | |
+ assertFalse(bunchServersAssigned.contains(sn)); | |
} | |
} | |
- private Collection<HRegion> getRegions() { | |
- Collection<HRegion> regions = new ArrayList<HRegion>(); | |
- List<RegionServerThread> rsthreads = | |
- TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads(); | |
- for (RegionServerThread t: rsthreads) { | |
- HRegionServer rs = t.getRegionServer(); | |
- Collection<HRegion> lr = rs.getOnlineRegionsLocalContext(); | |
- LOG.info("Found " + lr + " on " + rs); | |
- regions.addAll(lr); | |
- } | |
- return regions; | |
+ private void addServerToDrainedList(ServerName serverName, | |
+ Map<ServerName, ServerLoad> onlineServers, ServerManager serverManager) { | |
+ onlineServers.remove(serverName); | |
+ List<ServerName> availableServers = new ArrayList<ServerName>(onlineServers.keySet()); | |
+ Mockito.when(serverManager.createDestinationServersList()).thenReturn(availableServers); | |
+ Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(availableServers); | |
} | |
- private static void waitForAllRegionsOnline() throws Exception { | |
- // Wait for regions to come back on line again. | |
- boolean done = false; | |
- while (!done) { | |
- Thread.sleep(1); | |
- | |
- // Nothing in ZK RIT for a start | |
- ZKAssign.blockUntilNoRIT(TEST_UTIL.getZooKeeperWatcher()); | |
- | |
- // Then we want all the regions to be marked as available... | |
- if (!isAllRegionsOnline()) continue; | |
- | |
- // And without any work in progress on the master side | |
- if (TEST_UTIL.getMiniHBaseCluster().getMaster(). | |
- getAssignmentManager().getRegionStates().isRegionsInTransition()) continue; | |
- | |
- // nor on the region server side | |
- done = true; | |
- for (JVMClusterUtil.RegionServerThread rs : | |
- TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) { | |
- if (!rs.getRegionServer().getRegionsInTransitionInRS().isEmpty()) { | |
- done = false; | |
- } | |
- // Sleep some else we spam the log w/ notice that servers are not yet alive. | |
- Threads.sleep(10); | |
- } | |
- } | |
+ private void setRegionOpenedOnZK(final ZooKeeperWatcher zkWatcher, final ServerName serverName, | |
+ HRegionInfo hregionInfo) throws KeeperException, InterruptedException { | |
+ int version = ZKAssign.getVersion(zkWatcher, hregionInfo); | |
+ int versionTransition = ZKAssign.transitionNode(zkWatcher, | |
+ hregionInfo, serverName, EventType.M_ZK_REGION_OFFLINE, | |
+ EventType.RS_ZK_REGION_OPENING, version); | |
+ version = ZKAssign.getVersion(zkWatcher, hregionInfo); | |
+ ZKAssign.transitionNodeOpened(zkWatcher, hregionInfo, serverName, versionTransition); | |
} | |
- private static boolean isAllRegionsOnline() { | |
- return TEST_UTIL.getMiniHBaseCluster().countServedRegions() >= | |
- (COUNT_OF_REGIONS + 2 /*catalog and namespace regions*/); | |
+ private ExecutorService startupMasterExecutor(final String name) { | |
+ ExecutorService executor = new ExecutorService(name); | |
+ executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3); | |
+ executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3); | |
+ executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3); | |
+ executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3); | |
+ return executor; | |
} | |
} | |
\ No newline at end of file |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment