Created
August 12, 2013 01:39
-
-
Save gustavoanatoly/6207730 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.apache.hadoop.hbase; | |
import com.google.protobuf.ServiceException; | |
import org.apache.commons.logging.Log; | |
import org.apache.commons.logging.LogFactory; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.hbase.catalog.CatalogTracker; | |
import org.apache.hadoop.hbase.executor.EventType; | |
import org.apache.hadoop.hbase.executor.ExecutorService; | |
import org.apache.hadoop.hbase.executor.ExecutorType; | |
import org.apache.hadoop.hbase.master.*; | |
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; | |
import org.apache.hadoop.hbase.regionserver.RegionOpeningState; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.hadoop.hbase.zookeeper.ZKAssign; | |
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; | |
import org.apache.zookeeper.KeeperException; | |
import org.junit.AfterClass; | |
import org.junit.BeforeClass; | |
import org.junit.Test; | |
import org.junit.experimental.categories.Category; | |
import org.mockito.Mockito; | |
import java.io.IOException; | |
import java.util.*; | |
import java.util.Map.Entry; | |
import static org.junit.Assert.*; | |
@Category(MediumTests.class) | |
public class TestDrainingServerFinal { | |
private static final Log LOG = LogFactory.getLog(TestDrainingServer.class); | |
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); | |
private Abortable abortable = new Abortable() { | |
@Override | |
public boolean isAborted() { | |
return false; | |
} | |
@Override | |
public void abort(String why, Throwable e) { | |
} | |
}; | |
@AfterClass | |
public static void afterClass() throws Exception { | |
TEST_UTIL.shutdownMiniZKCluster(); | |
} | |
@BeforeClass | |
public static void beforeClass() throws Exception { | |
TEST_UTIL.startMiniZKCluster(); | |
} | |
/** | |
* This is an unit test, to verify if Assignment Manager | |
* doesn't use a drained server. | |
* @throws Exception | |
*/ | |
@Test | |
public void testAssignmentManagerDoesntUseDrainingServer() throws Exception { | |
AssignmentManager am = null; | |
Configuration conf = TEST_UTIL.getConfiguration(); | |
final HMaster master = Mockito.mock(HMaster.class); | |
final Server server = Mockito.mock(Server.class); | |
final ServerManager serverManager = Mockito.mock(ServerManager.class); | |
final ServerName SERVERNAME_A = new ServerName("mockserver.org", 1000, 8000); | |
final ServerName SERVERNAME_B = new ServerName("mockserver.org", 1001, 8000); | |
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf); | |
CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class); | |
final HRegionInfo REGIONINFO = new HRegionInfo(Bytes.toBytes("table_mock_test"), | |
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
List<ServerName> favored = new ArrayList<ServerName>(); | |
favored.add(SERVERNAME_A); | |
favored.add(SERVERNAME_B); | |
ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "zkWatcher-Test", abortable, true); | |
Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>(); | |
onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD); | |
onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD); | |
Mockito.when(server.getConfiguration()).thenReturn(conf); | |
Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1")); | |
Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher); | |
Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers); | |
Mockito.when(serverManager.getOnlineServersList()).thenReturn(new ArrayList<ServerName>(onlineServers.keySet())); | |
Mockito.when(serverManager.createDestinationServersList()).thenReturn(new ArrayList<ServerName>(onlineServers.keySet())); | |
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(new ArrayList<ServerName>(onlineServers.keySet())); | |
for (ServerName sn : onlineServers.keySet()) { | |
Mockito.when(serverManager.isServerOnline(sn)).thenReturn(true); | |
Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1)).thenReturn(true); | |
Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1, null, false)).thenReturn(true); | |
Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, new ArrayList<ServerName>())).thenReturn( | |
RegionOpeningState.OPENED); | |
Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, null)).thenReturn(RegionOpeningState.OPENED); | |
Mockito.when(serverManager.addServerToDrainList(sn)).thenReturn(true); | |
} | |
Mockito.when(master.getServerManager()).thenReturn(serverManager); | |
am = new AssignmentManager(server, serverManager, catalogTracker, | |
balancer, startupMasterExecutor("mockExecutorService"), null, null); | |
Mockito.when(master.getAssignmentManager()).thenReturn(am); | |
Mockito.when(master.getZooKeeperWatcher()).thenReturn(zkWatcher); | |
Mockito.when(master.getZooKeeper()).thenReturn(zkWatcher); | |
am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_A)); | |
am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_B)); | |
zkWatcher.registerListenerFirst(am); | |
addServerToDrainedList(SERVERNAME_A, onlineServers, serverManager); | |
am.assign(REGIONINFO, true); | |
setRegionOpenedOnZK(zkWatcher, SERVERNAME_A, REGIONINFO); | |
setRegionOpenedOnZK(zkWatcher, SERVERNAME_B, REGIONINFO); | |
am.waitForAssignment(REGIONINFO); | |
assertTrue(am.getRegionStates().isRegionAssigned(REGIONINFO)); | |
assertNotEquals(am.getRegionStates().getRegionServerOfRegion(REGIONINFO), SERVERNAME_A); | |
} | |
@Test | |
public void testAssignmentManagerDoesntUseDrainedServerWithBulkAssign() throws ServiceException, | |
InterruptedException, IOException, KeeperException, NoSuchFieldException, IllegalAccessException { | |
Configuration conf = TEST_UTIL.getConfiguration(); | |
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf); | |
CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class); | |
AssignmentManager am = null; | |
final HMaster master = Mockito.mock(HMaster.class); | |
final Server server = Mockito.mock(Server.class); | |
final ServerManager serverManager = Mockito.mock(ServerManager.class); | |
final ServerName SERVERNAME_A = new ServerName("mockserverbulk.org", 1000, 8000); | |
final ServerName SERVERNAME_B = new ServerName("mockserverbulk.org", 1001, 8000); | |
final ServerName SERVERNAME_C = new ServerName("mockserverbulk.org", 1002, 8000); | |
final ServerName SERVERNAME_D = new ServerName("mockserverbulk.org", 1003, 8000); | |
final ServerName SERVERNAME_E = new ServerName("mockserverbulk.org", 1004, 8000); | |
final Map<HRegionInfo, ServerName> bulk = new HashMap<HRegionInfo, ServerName>(); | |
Set<ServerName> bunchServersAssigned = new HashSet<ServerName>(); | |
HRegionInfo REGIONINFO_A = new HRegionInfo(Bytes.toBytes("table_A"), | |
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
HRegionInfo REGIONINFO_B = new HRegionInfo(Bytes.toBytes("table_B"), | |
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
HRegionInfo REGIONINFO_C = new HRegionInfo(Bytes.toBytes("table_C"), | |
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
HRegionInfo REGIONINFO_D = new HRegionInfo(Bytes.toBytes("table_D"), | |
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
HRegionInfo REGIONINFO_E = new HRegionInfo(Bytes.toBytes("table_E"), | |
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW); | |
Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>(); | |
List<ServerName> drainedServers = new ArrayList<ServerName>(); | |
onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD); | |
onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD); | |
onlineServers.put(SERVERNAME_C, ServerLoad.EMPTY_SERVERLOAD); | |
onlineServers.put(SERVERNAME_D, ServerLoad.EMPTY_SERVERLOAD); | |
onlineServers.put(SERVERNAME_E, ServerLoad.EMPTY_SERVERLOAD); | |
bulk.put(REGIONINFO_A, SERVERNAME_A); | |
bulk.put(REGIONINFO_B, SERVERNAME_B); | |
bulk.put(REGIONINFO_C, SERVERNAME_C); | |
bulk.put(REGIONINFO_D, SERVERNAME_D); | |
bulk.put(REGIONINFO_E, SERVERNAME_E); | |
ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "zkWatcher-BulkAssignTest", abortable, true); | |
Mockito.when(server.getConfiguration()).thenReturn(conf); | |
Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1")); | |
Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher); | |
Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers); | |
Mockito.when(serverManager.getOnlineServersList()).thenReturn( | |
new ArrayList<ServerName>(onlineServers.keySet())); | |
Mockito.when(serverManager.createDestinationServersList()).thenReturn( | |
new ArrayList<ServerName>(onlineServers.keySet())); | |
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn( | |
new ArrayList<ServerName>(onlineServers.keySet())); | |
for (Entry<HRegionInfo, ServerName> entry : bulk.entrySet()) { | |
Mockito.when(serverManager.isServerOnline(entry.getValue())).thenReturn(true); | |
Mockito.when(serverManager.sendRegionClose(entry.getValue(), entry.getKey(), -1)).thenReturn(true); | |
Mockito.when(serverManager.sendRegionOpen(entry.getValue(), entry.getKey(), -1, null)).thenReturn( | |
RegionOpeningState.OPENED); | |
Mockito.when(serverManager.addServerToDrainList(entry.getValue())).thenReturn(true); | |
} | |
Mockito.when(master.getServerManager()).thenReturn(serverManager); | |
drainedServers.add(SERVERNAME_A); | |
drainedServers.add(SERVERNAME_B); | |
drainedServers.add(SERVERNAME_C); | |
drainedServers.add(SERVERNAME_D); | |
am = new AssignmentManager(server, serverManager, catalogTracker, | |
balancer, startupMasterExecutor("mockExecutorServiceBulk"), null, null); | |
Mockito.when(master.getAssignmentManager()).thenReturn(am); | |
zkWatcher.registerListener(am); | |
for (ServerName drained : drainedServers) { | |
addServerToDrainedList(drained, onlineServers, serverManager); | |
} | |
am.assign(bulk); | |
for (Entry<String, RegionState> entry : am.getRegionStates().getRegionsInTransition().entrySet()) { | |
setRegionOpenedOnZK(zkWatcher, entry.getValue().getServerName(), entry.getValue().getRegion()); | |
} | |
am.waitForAssignment(REGIONINFO_A); | |
am.waitForAssignment(REGIONINFO_B); | |
am.waitForAssignment(REGIONINFO_C); | |
am.waitForAssignment(REGIONINFO_D); | |
am.waitForAssignment(REGIONINFO_E); | |
for (Entry<HRegionInfo, ServerName> entry : am.getRegionStates().getRegionAssignments().entrySet()) { | |
LOG.info("Region Assignment: " + entry.getKey().getRegionNameAsString() + " Server: " + entry.getValue()); | |
bunchServersAssigned.add(entry.getValue()); | |
} | |
for (ServerName sn : drainedServers) { | |
assertFalse(bunchServersAssigned.contains(sn)); | |
} | |
} | |
private void addServerToDrainedList(ServerName serverName, Map<ServerName, ServerLoad> onlineServers, ServerManager serverManager) { | |
onlineServers.remove(serverName); | |
List<ServerName> availableServers = new ArrayList<ServerName>(onlineServers.keySet()); | |
Mockito.when(serverManager.createDestinationServersList()).thenReturn(availableServers); | |
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(availableServers); | |
} | |
/** | |
* Put region opened on ZooKeeper | |
* | |
* @param zkWatcher | |
* @param serverName | |
* @param hregionInfo | |
* @throws KeeperException | |
* @throws InterruptedException | |
*/ | |
private void setRegionOpenedOnZK(final ZooKeeperWatcher zkWatcher, | |
final ServerName serverName, HRegionInfo hregionInfo) throws KeeperException, InterruptedException { | |
int version = ZKAssign.getVersion(zkWatcher, hregionInfo); | |
int versionTransition = ZKAssign.transitionNode(zkWatcher, | |
hregionInfo, serverName, EventType.M_ZK_REGION_OFFLINE, | |
EventType.RS_ZK_REGION_OPENING, version); | |
version = ZKAssign.getVersion(zkWatcher, hregionInfo); | |
ZKAssign.transitionNodeOpened(zkWatcher, hregionInfo, serverName, versionTransition); | |
} | |
private ExecutorService startupMasterExecutor(final String name) { | |
// TODO: Move up into HBaseTestingUtility? Generally useful. | |
ExecutorService executor = new ExecutorService(name); | |
executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3); | |
executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3); | |
executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3); | |
executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3); | |
return executor; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment