Skip to content

Instantly share code, notes, and snippets.

@gustavoanatoly
Created August 12, 2013 01:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gustavoanatoly/6207730 to your computer and use it in GitHub Desktop.
Save gustavoanatoly/6207730 to your computer and use it in GitHub Desktop.
package org.apache.hadoop.hbase;
import com.google.protobuf.ServiceException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorType;
import org.apache.hadoop.hbase.master.*;
import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import static org.junit.Assert.*;
@Category(MediumTests.class)
public class TestDrainingServerFinal {
private static final Log LOG = LogFactory.getLog(TestDrainingServer.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private Abortable abortable = new Abortable() {
@Override
public boolean isAborted() {
return false;
}
@Override
public void abort(String why, Throwable e) {
}
};
@AfterClass
public static void afterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();
}
@BeforeClass
public static void beforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
}
/**
* This is an unit test, to verify if Assignment Manager
* doesn't use a drained server.
* @throws Exception
*/
@Test
public void testAssignmentManagerDoesntUseDrainingServer() throws Exception {
AssignmentManager am = null;
Configuration conf = TEST_UTIL.getConfiguration();
final HMaster master = Mockito.mock(HMaster.class);
final Server server = Mockito.mock(Server.class);
final ServerManager serverManager = Mockito.mock(ServerManager.class);
final ServerName SERVERNAME_A = new ServerName("mockserver.org", 1000, 8000);
final ServerName SERVERNAME_B = new ServerName("mockserver.org", 1001, 8000);
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf);
CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class);
final HRegionInfo REGIONINFO = new HRegionInfo(Bytes.toBytes("table_mock_test"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
List<ServerName> favored = new ArrayList<ServerName>();
favored.add(SERVERNAME_A);
favored.add(SERVERNAME_B);
ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "zkWatcher-Test", abortable, true);
Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
Mockito.when(server.getConfiguration()).thenReturn(conf);
Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1"));
Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
Mockito.when(serverManager.getOnlineServersList()).thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
Mockito.when(serverManager.createDestinationServersList()).thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(new ArrayList<ServerName>(onlineServers.keySet()));
for (ServerName sn : onlineServers.keySet()) {
Mockito.when(serverManager.isServerOnline(sn)).thenReturn(true);
Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1)).thenReturn(true);
Mockito.when(serverManager.sendRegionClose(sn, REGIONINFO, -1, null, false)).thenReturn(true);
Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, new ArrayList<ServerName>())).thenReturn(
RegionOpeningState.OPENED);
Mockito.when(serverManager.sendRegionOpen(sn, REGIONINFO, -1, null)).thenReturn(RegionOpeningState.OPENED);
Mockito.when(serverManager.addServerToDrainList(sn)).thenReturn(true);
}
Mockito.when(master.getServerManager()).thenReturn(serverManager);
am = new AssignmentManager(server, serverManager, catalogTracker,
balancer, startupMasterExecutor("mockExecutorService"), null, null);
Mockito.when(master.getAssignmentManager()).thenReturn(am);
Mockito.when(master.getZooKeeperWatcher()).thenReturn(zkWatcher);
Mockito.when(master.getZooKeeper()).thenReturn(zkWatcher);
am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_A));
am.addPlan(REGIONINFO.getEncodedName(), new RegionPlan(REGIONINFO, null, SERVERNAME_B));
zkWatcher.registerListenerFirst(am);
addServerToDrainedList(SERVERNAME_A, onlineServers, serverManager);
am.assign(REGIONINFO, true);
setRegionOpenedOnZK(zkWatcher, SERVERNAME_A, REGIONINFO);
setRegionOpenedOnZK(zkWatcher, SERVERNAME_B, REGIONINFO);
am.waitForAssignment(REGIONINFO);
assertTrue(am.getRegionStates().isRegionAssigned(REGIONINFO));
assertNotEquals(am.getRegionStates().getRegionServerOfRegion(REGIONINFO), SERVERNAME_A);
}
@Test
public void testAssignmentManagerDoesntUseDrainedServerWithBulkAssign() throws ServiceException,
InterruptedException, IOException, KeeperException, NoSuchFieldException, IllegalAccessException {
Configuration conf = TEST_UTIL.getConfiguration();
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(conf);
CatalogTracker catalogTracker = Mockito.mock(CatalogTracker.class);
AssignmentManager am = null;
final HMaster master = Mockito.mock(HMaster.class);
final Server server = Mockito.mock(Server.class);
final ServerManager serverManager = Mockito.mock(ServerManager.class);
final ServerName SERVERNAME_A = new ServerName("mockserverbulk.org", 1000, 8000);
final ServerName SERVERNAME_B = new ServerName("mockserverbulk.org", 1001, 8000);
final ServerName SERVERNAME_C = new ServerName("mockserverbulk.org", 1002, 8000);
final ServerName SERVERNAME_D = new ServerName("mockserverbulk.org", 1003, 8000);
final ServerName SERVERNAME_E = new ServerName("mockserverbulk.org", 1004, 8000);
final Map<HRegionInfo, ServerName> bulk = new HashMap<HRegionInfo, ServerName>();
Set<ServerName> bunchServersAssigned = new HashSet<ServerName>();
HRegionInfo REGIONINFO_A = new HRegionInfo(Bytes.toBytes("table_A"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
HRegionInfo REGIONINFO_B = new HRegionInfo(Bytes.toBytes("table_B"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
HRegionInfo REGIONINFO_C = new HRegionInfo(Bytes.toBytes("table_C"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
HRegionInfo REGIONINFO_D = new HRegionInfo(Bytes.toBytes("table_D"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
HRegionInfo REGIONINFO_E = new HRegionInfo(Bytes.toBytes("table_E"),
HConstants.EMPTY_START_ROW, HConstants.EMPTY_START_ROW);
Map<ServerName, ServerLoad> onlineServers = new HashMap<ServerName, ServerLoad>();
List<ServerName> drainedServers = new ArrayList<ServerName>();
onlineServers.put(SERVERNAME_A, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_B, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_C, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_D, ServerLoad.EMPTY_SERVERLOAD);
onlineServers.put(SERVERNAME_E, ServerLoad.EMPTY_SERVERLOAD);
bulk.put(REGIONINFO_A, SERVERNAME_A);
bulk.put(REGIONINFO_B, SERVERNAME_B);
bulk.put(REGIONINFO_C, SERVERNAME_C);
bulk.put(REGIONINFO_D, SERVERNAME_D);
bulk.put(REGIONINFO_E, SERVERNAME_E);
ZooKeeperWatcher zkWatcher = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), "zkWatcher-BulkAssignTest", abortable, true);
Mockito.when(server.getConfiguration()).thenReturn(conf);
Mockito.when(server.getServerName()).thenReturn(new ServerName("masterMock,1,1"));
Mockito.when(server.getZooKeeper()).thenReturn(zkWatcher);
Mockito.when(serverManager.getOnlineServers()).thenReturn(onlineServers);
Mockito.when(serverManager.getOnlineServersList()).thenReturn(
new ArrayList<ServerName>(onlineServers.keySet()));
Mockito.when(serverManager.createDestinationServersList()).thenReturn(
new ArrayList<ServerName>(onlineServers.keySet()));
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(
new ArrayList<ServerName>(onlineServers.keySet()));
for (Entry<HRegionInfo, ServerName> entry : bulk.entrySet()) {
Mockito.when(serverManager.isServerOnline(entry.getValue())).thenReturn(true);
Mockito.when(serverManager.sendRegionClose(entry.getValue(), entry.getKey(), -1)).thenReturn(true);
Mockito.when(serverManager.sendRegionOpen(entry.getValue(), entry.getKey(), -1, null)).thenReturn(
RegionOpeningState.OPENED);
Mockito.when(serverManager.addServerToDrainList(entry.getValue())).thenReturn(true);
}
Mockito.when(master.getServerManager()).thenReturn(serverManager);
drainedServers.add(SERVERNAME_A);
drainedServers.add(SERVERNAME_B);
drainedServers.add(SERVERNAME_C);
drainedServers.add(SERVERNAME_D);
am = new AssignmentManager(server, serverManager, catalogTracker,
balancer, startupMasterExecutor("mockExecutorServiceBulk"), null, null);
Mockito.when(master.getAssignmentManager()).thenReturn(am);
zkWatcher.registerListener(am);
for (ServerName drained : drainedServers) {
addServerToDrainedList(drained, onlineServers, serverManager);
}
am.assign(bulk);
for (Entry<String, RegionState> entry : am.getRegionStates().getRegionsInTransition().entrySet()) {
setRegionOpenedOnZK(zkWatcher, entry.getValue().getServerName(), entry.getValue().getRegion());
}
am.waitForAssignment(REGIONINFO_A);
am.waitForAssignment(REGIONINFO_B);
am.waitForAssignment(REGIONINFO_C);
am.waitForAssignment(REGIONINFO_D);
am.waitForAssignment(REGIONINFO_E);
for (Entry<HRegionInfo, ServerName> entry : am.getRegionStates().getRegionAssignments().entrySet()) {
LOG.info("Region Assignment: " + entry.getKey().getRegionNameAsString() + " Server: " + entry.getValue());
bunchServersAssigned.add(entry.getValue());
}
for (ServerName sn : drainedServers) {
assertFalse(bunchServersAssigned.contains(sn));
}
}
private void addServerToDrainedList(ServerName serverName, Map<ServerName, ServerLoad> onlineServers, ServerManager serverManager) {
onlineServers.remove(serverName);
List<ServerName> availableServers = new ArrayList<ServerName>(onlineServers.keySet());
Mockito.when(serverManager.createDestinationServersList()).thenReturn(availableServers);
Mockito.when(serverManager.createDestinationServersList(null)).thenReturn(availableServers);
}
/**
* Put region opened on ZooKeeper
*
* @param zkWatcher
* @param serverName
* @param hregionInfo
* @throws KeeperException
* @throws InterruptedException
*/
private void setRegionOpenedOnZK(final ZooKeeperWatcher zkWatcher,
final ServerName serverName, HRegionInfo hregionInfo) throws KeeperException, InterruptedException {
int version = ZKAssign.getVersion(zkWatcher, hregionInfo);
int versionTransition = ZKAssign.transitionNode(zkWatcher,
hregionInfo, serverName, EventType.M_ZK_REGION_OFFLINE,
EventType.RS_ZK_REGION_OPENING, version);
version = ZKAssign.getVersion(zkWatcher, hregionInfo);
ZKAssign.transitionNodeOpened(zkWatcher, hregionInfo, serverName, versionTransition);
}
private ExecutorService startupMasterExecutor(final String name) {
// TODO: Move up into HBaseTestingUtility? Generally useful.
ExecutorService executor = new ExecutorService(name);
executor.startExecutorService(ExecutorType.MASTER_OPEN_REGION, 3);
executor.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, 3);
executor.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, 3);
executor.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, 3);
return executor;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment