Skip to content

Instantly share code, notes, and snippets.

@gustavoanatoly
Created August 17, 2013 22:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gustavoanatoly/6258956 to your computer and use it in GitHub Desktop.
Save gustavoanatoly/6258956 to your computer and use it in GitHub Desktop.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
index c1723e2..6488b32 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
@@ -18,33 +18,41 @@
package org.apache.hadoop.hbase;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
-import junit.framework.Assert;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
-import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.master.LoadBalancer;
+import org.apache.hadoop.hbase.master.RegionPlan;
+import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
-import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
+import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
/**
@@ -54,258 +62,255 @@
* master does no assign regions to a regionserver marked as drained.
*
* @see <a href="https://issues.apache.org/jira/browse/HBASE-4298">HBASE-4298</a>
+ *
+ *
+ * TestDrainingServer is an integration test. It should be a unit test instead
+ * @see <a href="https://issues.apache.org/jira/browse/HBASE-7462">HBASE-7462</a>
*/
@Category(MediumTests.class)
public class TestDrainingServer {
private static final Log LOG = LogFactory.getLog(TestDrainingServer.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static final int NB_SLAVES = 5;
- private static final int COUNT_OF_REGIONS = NB_SLAVES * 2;
-
- /**
- * Spin up a cluster with a bunch of regions on it.
- */
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- TEST_UTIL.startMiniCluster(NB_SLAVES);
- TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
- TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
-
- final List<String> families = new ArrayList<String>(1);
- families.add("family");
- TEST_UTIL.createRandomTable("table", families, 1, 0, 0, COUNT_OF_REGIONS, 0);
- // Ensure a stable env
- TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
-
- boolean ready = false;
- while (!ready){
- waitForAllRegionsOnline();
-
- // Assert that every regionserver has some regions on it.
- int i = 0;
- ready = true;
- while (i < NB_SLAVES && ready){
- HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(i);
- if (ProtobufUtil.getOnlineRegions(hrs).isEmpty()){
- ready = false;
- }
- i++;
- }
-
- if (!ready){
- TEST_UTIL.getHBaseAdmin().setBalancerRunning(true, true);
- Assert.assertTrue("Can't start a balance!", TEST_UTIL.getHBaseAdmin().balancer());
- TEST_UTIL.getHBaseAdmin().setBalancerRunning(false, false);
- Thread.sleep(100);
- }
+ private Abortable abortable = new Abortable() {
+ @Override
+ public boolean isAborted() {
+ return false;
}
- }
- private static HRegionServer setDrainingServer(final HRegionServer hrs)
- throws KeeperException {
- LOG.info("Making " + hrs.getServerName() + " the draining server; " +
- "it has " + hrs.getNumberOfOnlineRegions() + " online regions");
- ZooKeeperWatcher zkw = hrs.getZooKeeper();
- String hrsDrainingZnode =
- ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString());
- ZKUtil.createWithParents(zkw, hrsDrainingZnode);
- return hrs;
- }
+ @Override
+ public void abort(String why, Throwable e) {
- private static HRegionServer unsetDrainingServer(final HRegionServer hrs)
- throws KeeperException {
- ZooKeeperWatcher zkw = hrs.getZooKeeper();
- String hrsDrainingZnode =
- ZKUtil.joinZNode(zkw.drainingZNode, hrs.getServerName().toString());
- ZKUtil.deleteNode(zkw, hrsDrainingZnode);
- return hrs;
- }
+ }
+ };
@AfterClass
- public static void tearDownAfterClass() throws Exception {
- TEST_UTIL.shutdownMiniCluster();
+ public static void afterClass() throws Exception {
+ TEST_UTIL.shutdownMiniZKCluster();
+ }
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
}
- /**
- * Test adding server to draining servers and then move regions off it.
- * Make sure that no regions are moved back to the draining server.
- * @throws IOException
- * @throws KeeperException
- */
- @Test // (timeout=30000)
- public void testDrainingServerOffloading()
- throws Exception {
- // I need master in the below.
- HMaster master = TEST_UTIL.getMiniHBaseCluster().getMaster();
- HRegionInfo hriToMoveBack = null;
- // Set first server as draining server.
- HRegionServer drainingServer =
- setDrainingServer(TEST_UTIL.getMiniHBaseCluster().getRegionServer(0));
- try {
- final int regionsOnDrainingServer =
- drainingServer.getNumberOfOnlineRegions();
- Assert.assertTrue(regionsOnDrainingServer > 0);
- List<HRegionInfo> hris = ProtobufUtil.getOnlineRegions(drainingServer);
- for (HRegionInfo hri : hris) {
- // Pass null and AssignmentManager will chose a random server BUT it
- // should exclude draining servers.
- master.moveRegion(null,
- RequestConverter.buildMoveRegionRequest(hri.getEncodedNameAsBytes(), null));
- // Save off region to move back.
- hriToMoveBack = hri;
- }
- // Wait for regions to come back on line again.
- waitForAllRegionsOnline();
- Assert.assertEquals(0, drainingServer.getNumberOfOnlineRegions());
- } finally {
- unsetDrainingServer(drainingServer);
+ @Test
+ public void testAssignmentManagerDoesntUseDrainingServer() throws Exception {
+ AssignmentManager am = null;
+ Configuration conf = TEST_UTIL.getConfiguration();
+ final HMaster master = Mockito.mock(HMaster.class);
+ final Server server = Mockito.mock(Server.class);
+ final ServerManager serverManager = Mockito.mock(ServerManager.class);
+ final ServerName SERVERNAME_A = new ServerName("mockserver_a.org", 1000, 8000);
+ final ServerName SERVERNAME_B = new ServerName("mockserver_b.org", 1001, 8000);
+ LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf);
+ CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class);
+ final HRegionInfo REGIONINFO = new HRegionInfo(TableName.valueOf("table_test"),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
+
+ List<ServerName> favored = new ArrayList<ServerName>();
+ favored.add(SERVERNAME_A);
+ favored.add(SERVERNAME_B);
+
+ ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
+ "zkWatcher-Test", abortable, true);
+
+ Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
+
+ onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
+ onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
+
+
+ Mockito.when(server.getConfiguration()).thenReturn(conf);
+ Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1"));
+ Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
+
+ Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
+ Mockito.when(serverManager.getOnlineServersList())
+ .thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
+
+ Mockito.when(serverManager.createDestinationServersList())
+ .thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
+ Mockito.when(serverManager.createDestinationServersList(null))
+ .thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
+
+ for (ServerName sn : onlineServers.keySet()) {
+ Mockito.when(serverManager.isServerOnline(sn)).thenReturn(true);
+ Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1)).thenReturn(true);
+ Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1, null, false)).thenReturn(true);
+ Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, new ArrayList<ServerName>()))
+ .thenReturn(RegionOpeningState.OPENED);
+ Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, null))
+ .thenReturn(RegionOpeningState.OPENED);
+ Mockito.when(serverManager.addServerToDrainList(sn)).thenReturn(true);
}
- // Now we've unset the draining server, we should be able to move a region
- // to what was the draining server.
- master.moveRegion(null,
- RequestConverter.buildMoveRegionRequest(hriToMoveBack.getEncodedNameAsBytes(),
- Bytes.toBytes(drainingServer.getServerName().toString())));
- // Wait for regions to come back on line again.
- waitForAllRegionsOnline();
- Assert.assertEquals(1, drainingServer.getNumberOfOnlineRegions());
+
+ Mockito.when(master.getServerManager()).thenReturn(serverManager);
+
+ am = new AssignmentManager(server, serverManager, catalogTracker,
+ balancer, startupMasterExecutor("mockExecutorService"), null, null);
+
+ Mockito.when(master.getAssignmentManager()).thenReturn(am);
+ Mockito.when(master.getZooKeeperWatcher()).thenReturn(zkWatcher);
+ Mockito.when(master.getZooKeeper()).thenReturn(zkWatcher);
+
+ am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_A));
+ am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_B));
+
+ zkWatcher.registerListenerFirst(am);
+
+ addServerToDrainedList(SERVERNAME_A, onlineServers, serverManager);
+
+
+ am.assign(REGIONINFO, true);
+
+ setRegionOpenedOnZK(zkWatcher, SERVERNAME_A, REGIONINFO);
+ setRegionOpenedOnZK(zkWatcher, SERVERNAME_B, REGIONINFO);
+
+
+ am.waitForAssignment(REGIONINFO);
+
+
+ assertTrue(am.getRegionStates().isRegionAssigned(REGIONINFO));
+ assertNotEquals(am.getRegionStates().getRegionServerOfRegion(REGIONINFO), SERVERNAME_A);
}
- /**
- * Test that draining servers are ignored even after killing regionserver(s).
- * Verify that the draining server is not given any of the dead servers regions.
- * @throws KeeperException
- * @throws IOException
- */
- @Test (timeout=30000)
- public void testDrainingServerWithAbort() throws KeeperException, Exception {
- HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+ @Test
+ public void testAssignmentManagerDoesntUseDrainedServerWithBulkAssign() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf);
+ CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class);
+ AssignmentManager am = null;
+ final HMaster master = Mockito.mock(HMaster.class);
+ final Server server = Mockito.mock(Server.class);
+ final ServerManager serverManager = Mockito.mock(ServerManager.class);
+ final ServerName SERVERNAME_A = new ServerName("mockserverbulk_a.org", 1000, 8000);
+ final ServerName SERVERNAME_B = new ServerName("mockserverbulk_b.org", 1001, 8000);
+ final ServerName SERVERNAME_C = new ServerName("mockserverbulk_c.org", 1002, 8000);
+ final ServerName SERVERNAME_D = new ServerName("mockserverbulk_d.org", 1003, 8000);
+ final ServerName SERVERNAME_E = new ServerName("mockserverbulk_e.org", 1004, 8000);
+ final Map<HRegionInfo, ServerName> bulk = new HashMap<HRegionInfo, ServerName>();
- waitForAllRegionsOnline();
+ Set<ServerName> bunchServersAssigned = new HashSet<ServerName>();
+
+ HRegionInfo REGIONINFO_A = new HRegionInfo(TableName.valueOf("table_A"),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
+ HRegionInfo REGIONINFO_B = new HRegionInfo(TableName.valueOf("table_B"),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
+ HRegionInfo REGIONINFO_C = new HRegionInfo(TableName.valueOf("table_C"),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
+ HRegionInfo REGIONINFO_D = new HRegionInfo(TableName.valueOf("table_D"),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
+ HRegionInfo REGIONINFO_E = new HRegionInfo(TableName.valueOf("table_E"),
+ HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
- final long regionCount = TEST_UTIL.getMiniHBaseCluster().countServedRegions();
+ Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
+ List<ServerName> drainedServers = new ArrayList<ServerName>();
- // Let's get a copy of the regions today.
- Collection<HRegion> regions = getRegions();
- LOG.info("All regions: " + regions);
+ onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
+ onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
+ onlineServers.put(SERVERNAME_C, ServerLoad.EMPTY_SERVERLOAD);
+ onlineServers.put(SERVERNAME_D, ServerLoad.EMPTY_SERVERLOAD);
+ onlineServers.put(SERVERNAME_E, ServerLoad.EMPTY_SERVERLOAD);
- // Choose the draining server
- HRegionServer drainingServer = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
- final int regionsOnDrainingServer = drainingServer.getNumberOfOnlineRegions();
- Assert.assertTrue(regionsOnDrainingServer > 0);
+ bulk.put(REGIONINFO_A, SERVERNAME_A);
+ bulk.put(REGIONINFO_B, SERVERNAME_B);
+ bulk.put(REGIONINFO_C, SERVERNAME_C);
+ bulk.put(REGIONINFO_D, SERVERNAME_D);
+ bulk.put(REGIONINFO_E, SERVERNAME_E);
- ServerManager sm = master.getServerManager();
+ ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
+ "zkWatcher-BulkAssignTest", abortable, true);
- Collection<HRegion> regionsBefore = drainingServer.getOnlineRegionsLocalContext();
- LOG.info("Regions of drained server are: "+ regionsBefore );
+ Mockito.when(server.getConfiguration()).thenReturn(conf);
+ Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1"));
+ Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
- try {
- // Add first server to draining servers up in zk.
- setDrainingServer(drainingServer);
+ Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
+ Mockito.when(serverManager.getOnlineServersList()).thenReturn(
+ new ArrayList<ServerName>(onlineServers.keySet()));
+
+ Mockito.when(serverManager.createDestinationServersList()).thenReturn(
+ new ArrayList<ServerName>(onlineServers.keySet()));
+ Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(
+ new ArrayList<ServerName>(onlineServers.keySet()));
+
+ for (Entry<HRegionInfo, ServerName> entry : bulk.entrySet()) {
+ Mockito.when(serverManager.isServerOnline(entry.getValue())).thenReturn(true);
+ Mockito.when(serverManager.sendRegionClose(entry.getValue(),
+ entry.getKey(), -1)).thenReturn(true);
+ Mockito.when(serverManager.sendRegionOpen(entry.getValue(),
+ entry.getKey(), -1, null)).thenReturn(RegionOpeningState.OPENED);
+ Mockito.when(serverManager.addServerToDrainList(entry.getValue())).thenReturn(true);
+ }
+
+ Mockito.when(master.getServerManager()).thenReturn(serverManager);
- //wait for the master to receive and manage the event
- while (sm.createDestinationServersList().contains(drainingServer.getServerName())) {
- Thread.sleep(1);
- }
+ drainedServers.add(SERVERNAME_A);
+ drainedServers.add(SERVERNAME_B);
+ drainedServers.add(SERVERNAME_C);
+ drainedServers.add(SERVERNAME_D);
- LOG.info("The available servers are: "+ sm.createDestinationServersList());
+ am = new AssignmentManager(server, serverManager, catalogTracker,
+ balancer, startupMasterExecutor("mockExecutorServiceBulk"), null, null);
+
+ Mockito.when(master.getAssignmentManager()).thenReturn(am);
- Assert.assertEquals("Nothing should have happened here.", regionsOnDrainingServer,
- drainingServer.getNumberOfOnlineRegions());
- Assert.assertFalse("We should not have regions in transition here. List is: " +
- master.getAssignmentManager().getRegionStates().getRegionsInTransition(),
- master.getAssignmentManager().getRegionStates().isRegionsInTransition());
+ zkWatcher.registerListener(am);
+
+ for (ServerName drained : drainedServers) {
+ addServerToDrainedList(drained, onlineServers, serverManager);
+ }
+
+ am.assign(bulk);
- // Kill a few regionservers.
- for (int aborted = 0; aborted <= 2; aborted++) {
- HRegionServer hrs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(aborted + 1);
- hrs.abort("Aborting");
- }
-
- // Wait for regions to come back online again. waitForAllRegionsOnline can come back before
- // we've assigned out regions on the cluster so retry if we are shy the wanted number
- Collection<HRegion> regionsAfter = null;
- for (int i = 0; i < 1000; i++) {
- waitForAllRegionsOnline();
- regionsAfter = getRegions();
- if (regionsAfter.size() >= regionCount) break;
- LOG.info("Expecting " + regionCount + " but only " + regionsAfter);
- Threads.sleep(10);
- }
- LOG.info("Regions of drained server: " + regionsAfter + ", all regions: " + getRegions());
- Assert.assertEquals("Test conditions are not met: regions were" +
- " created/deleted during the test. ",
- regionCount, TEST_UTIL.getMiniHBaseCluster().countServedRegions());
-
- // Assert the draining server still has the same regions.
- regionsAfter = drainingServer.getOnlineRegionsLocalContext();
- StringBuilder result = new StringBuilder();
- for (HRegion r: regionsAfter){
- if (!regionsBefore.contains(r)){
- result.append(r).append(" was added after the drain");
- if (regions.contains(r)){
- result.append("(existing region");
- } else {
- result.append("(new region)");
- }
- result.append("; ");
- }
- }
- for (HRegion r: regionsBefore){
- if (!regionsAfter.contains(r)){
- result.append(r).append(" was removed after the drain; ");
- }
- }
- Assert.assertTrue("Errors are: "+ result.toString(), result.length()==0);
-
- } finally {
- unsetDrainingServer(drainingServer);
+ Map<String, RegionState> regionsInTransition = am.getRegionStates().getRegionsInTransition();
+ for (Entry<String, RegionState> entry : regionsInTransition.entrySet()) {
+ setRegionOpenedOnZK(zkWatcher, entry.getValue().getServerName(),
+ entry.getValue().getRegion());
+ }
+
+ am.waitForAssignment(REGIONINFO_A);
+ am.waitForAssignment(REGIONINFO_B);
+ am.waitForAssignment(REGIONINFO_C);
+ am.waitForAssignment(REGIONINFO_D);
+ am.waitForAssignment(REGIONINFO_E);
+
+ Map<HRegionInfo, ServerName> regionAssignments = am.getRegionStates().getRegionAssignments();
+ for (Entry<HRegionInfo, ServerName> entry : regionAssignments.entrySet()) {
+ LOG.info("Region Assignment: "
+ + entry.getKey().getRegionNameAsString() + " Server: " + entry.getValue());
+ bunchServersAssigned.add(entry.getValue());
+ }
+
+ for (ServerName sn : drainedServers) {
+ assertFalse(bunchServersAssigned.contains(sn));
}
}
- private Collection<HRegion> getRegions() {
- Collection<HRegion> regions = new ArrayList<HRegion>();
- List<RegionServerThread> rsthreads =
- TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads();
- for (RegionServerThread t: rsthreads) {
- HRegionServer rs = t.getRegionServer();
- Collection<HRegion> lr = rs.getOnlineRegionsLocalContext();
- LOG.info("Found " + lr + " on " + rs);
- regions.addAll(lr);
- }
- return regions;
+ private void addServerToDrainedList(ServerName serverName,
+ Map<ServerName, ServerLoad> onlineServers, ServerManager serverManager) {
+ onlineServers.remove(serverName);
+ List<ServerName> availableServers = new ArrayList<ServerName>(onlineServers.keySet());
+ Mockito.when(serverManager.createDestinationServersList()).thenReturn(availableServers);
+ Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(availableServers);
}
- private static void waitForAllRegionsOnline() throws Exception {
- // Wait for regions to come back on line again.
- boolean done = false;
- while (!done) {
- Thread.sleep(1);
-
- // Nothing in ZK RIT for a start
- ZKAssign.blockUntilNoRIT(TEST_UTIL.getZooKeeperWatcher());
-
- // Then we want all the regions to be marked as available...
- if (!isAllRegionsOnline()) continue;
-
- // And without any work in progress on the master side
- if (TEST_UTIL.getMiniHBaseCluster().getMaster().
- getAssignmentManager().getRegionStates().isRegionsInTransition()) continue;
-
- // nor on the region server side
- done = true;
- for (JVMClusterUtil.RegionServerThread rs :
- TEST_UTIL.getMiniHBaseCluster().getLiveRegionServerThreads()) {
- if (!rs.getRegionServer().getRegionsInTransitionInRS().isEmpty()) {
- done = false;
- }
- // Sleep some else we spam the log w/ notice that servers are not yet alive.
- Threads.sleep(10);
- }
- }
+ private void setRegionOpenedOnZK(final ZooKeeperWatcher zkWatcher, final ServerName serverName,
+ HRegionInfo hregionInfo) throws KeeperException, InterruptedException {
+ int version = ZKAssign.getVersion(zkWatcher, hregionInfo);
+ int versionTransition = ZKAssign.transitionNode(zkWatcher,
+ hregionInfo, serverName, EventType.M_ZK_REGION_OFFLINE,
+ EventType.RS_ZK_REGION_OPENING, version);
+ version = ZKAssign.getVersion(zkWatcher, hregionInfo);
+ ZKAssign.transitionNodeOpened(zkWatcher, hregionInfo, serverName, versionTransition);
}
- private static boolean isAllRegionsOnline() {
- return TEST_UTIL.getMiniHBaseCluster().countServedRegions() >=
- (COUNT_OF_REGIONS + 2 /*catalog and namespace regions*/);
+ private ExecutorService startupMasterExecutor(final String name) {
+ ExecutorService executor = new ExecutorService(name);
+ executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3);
+ executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3);
+ executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3);
+ executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3);
+ return executor;
}
}
\ No newline at end of file
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment