Created
November 4, 2015 17:16
-
-
Save aajisaka/0b51cdb6f484fa05ac78 to your computer and use it in GitHub Desktop.
Created gist to add comments
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.hadoop.hdfs.server.blockmanagement; | |
import static org.apache.hadoop.util.Time.monotonicNow; | |
import java.util.*; | |
import org.apache.hadoop.classification.InterfaceAudience; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.fs.StorageType; | |
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; | |
import org.apache.hadoop.hdfs.DFSConfigKeys; | |
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; | |
import org.apache.hadoop.net.NetworkTopology; | |
import org.apache.hadoop.net.Node; | |
import org.apache.hadoop.net.NodeBase; | |
import com.google.common.annotations.VisibleForTesting; | |
/** | |
* The class is responsible for choosing the desired number of targets | |
* for placing block replicas. | |
* The replica placement strategy is that if the writer is on a datanode, | |
* the 1st replica is placed on the local machine, | |
* otherwise a random datanode. The 2nd replica is placed on a datanode | |
* that is on a different rack. The 3rd replica is placed on a datanode | |
* which is on a different node of the rack as the second replica. | |
*/ | |
@InterfaceAudience.Private | |
public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { | |
private static final String enableDebugLogging = | |
"For more information, please enable DEBUG log level on " | |
+ BlockPlacementPolicy.class.getName(); | |
private static final ThreadLocal<StringBuilder> debugLoggingBuilder | |
= new ThreadLocal<StringBuilder>() { | |
@Override | |
protected StringBuilder initialValue() { | |
return new StringBuilder(); | |
} | |
}; | |
protected boolean considerLoad; | |
private boolean preferLocalNode = true; | |
protected NetworkTopology clusterMap; | |
protected Host2NodesMap host2datanodeMap; | |
private FSClusterStats stats; | |
protected long heartbeatInterval; // interval for DataNode heartbeats | |
private long staleInterval; // interval used to identify stale DataNodes | |
/** | |
* A miss of that many heartbeats is tolerated for replica deletion policy. | |
*/ | |
protected int tolerateHeartbeatMultiplier; | |
protected BlockPlacementPolicyDefault() { | |
} | |
@Override | |
public void initialize(Configuration conf, FSClusterStats stats, | |
NetworkTopology clusterMap, | |
Host2NodesMap host2datanodeMap) { | |
this.considerLoad = conf.getBoolean( | |
DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true); | |
this.stats = stats; | |
this.clusterMap = clusterMap; | |
this.host2datanodeMap = host2datanodeMap; | |
this.heartbeatInterval = conf.getLong( | |
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, | |
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000; | |
this.tolerateHeartbeatMultiplier = conf.getInt( | |
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_KEY, | |
DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT); | |
this.staleInterval = conf.getLong( | |
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY, | |
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT); | |
} | |
@Override | |
public DatanodeStorageInfo[] chooseTarget(String srcPath, | |
int numOfReplicas, | |
Node writer, | |
List<DatanodeStorageInfo> chosenNodes, | |
boolean returnChosenNodes, | |
Set<Node> excludedNodes, | |
long blocksize, | |
final BlockStoragePolicy storagePolicy) { | |
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes, | |
excludedNodes, blocksize, storagePolicy); | |
} | |
@Override | |
DatanodeStorageInfo[] chooseTarget(String src, | |
int numOfReplicas, | |
Node writer, | |
Set<Node> excludedNodes, | |
long blocksize, | |
List<DatanodeDescriptor> favoredNodes, | |
BlockStoragePolicy storagePolicy) { | |
try { | |
if (favoredNodes == null || favoredNodes.size() == 0) { | |
// Favored nodes not specified, fall back to regular block placement. | |
return chooseTarget(src, numOfReplicas, writer, | |
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, | |
excludedNodes, blocksize, storagePolicy); | |
} | |
Set<Node> favoriteAndExcludedNodes = excludedNodes == null ? | |
new HashSet<Node>() : new HashSet<>(excludedNodes); | |
final List<StorageType> requiredStorageTypes = storagePolicy | |
.chooseStorageTypes((short)numOfReplicas); | |
final EnumMap<StorageType, Integer> storageTypes = | |
getRequiredStorageTypes(requiredStorageTypes); | |
// Choose favored nodes | |
List<DatanodeStorageInfo> results = new ArrayList<>(); | |
boolean avoidStaleNodes = stats != null | |
&& stats.isAvoidingStaleDataNodesForWrite(); | |
int maxNodesAndReplicas[] = getMaxNodesPerRack(0, numOfReplicas); | |
numOfReplicas = maxNodesAndReplicas[0]; | |
int maxNodesPerRack = maxNodesAndReplicas[1]; | |
chooseFavouredNodes(src, numOfReplicas, favoredNodes, | |
favoriteAndExcludedNodes, blocksize, maxNodesPerRack, results, | |
avoidStaleNodes, storageTypes); | |
if (results.size() < numOfReplicas) { | |
// Not enough favored nodes, choose other nodes. | |
numOfReplicas -= results.size(); | |
DatanodeStorageInfo[] remainingTargets = | |
chooseTarget(src, numOfReplicas, writer, results, | |
false, favoriteAndExcludedNodes, blocksize, storagePolicy); | |
for (int i = 0; i < remainingTargets.length; i++) { | |
results.add(remainingTargets[i]); | |
} | |
} | |
return getPipeline(writer, | |
results.toArray(new DatanodeStorageInfo[results.size()])); | |
} catch (NotEnoughReplicasException nr) { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Failed to choose with favored nodes (=" + favoredNodes | |
+ "), disregard favored nodes hint and retry.", nr); | |
} | |
// Fall back to regular block placement disregarding favored nodes hint | |
return chooseTarget(src, numOfReplicas, writer, | |
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false, | |
excludedNodes, blocksize, storagePolicy); | |
} | |
} | |
protected void chooseFavouredNodes(String src, int numOfReplicas, | |
List<DatanodeDescriptor> favoredNodes, | |
Set<Node> favoriteAndExcludedNodes, long blocksize, int maxNodesPerRack, | |
List<DatanodeStorageInfo> results, boolean avoidStaleNodes, | |
EnumMap<StorageType, Integer> storageTypes) | |
throws NotEnoughReplicasException { | |
for (int i = 0; i < favoredNodes.size() && results.size() < numOfReplicas; | |
i++) { | |
DatanodeDescriptor favoredNode = favoredNodes.get(i); | |
// Choose a single node which is local to favoredNode. | |
// 'results' is updated within chooseLocalNode | |
final DatanodeStorageInfo target = | |
chooseLocalStorage(favoredNode, favoriteAndExcludedNodes, blocksize, | |
maxNodesPerRack, results, avoidStaleNodes, storageTypes, false); | |
if (target == null) { | |
LOG.warn("Could not find a target for file " + src | |
+ " with favored node " + favoredNode); | |
continue; | |
} | |
favoriteAndExcludedNodes.add(target.getDatanodeDescriptor()); | |
} | |
} | |
/** This is the implementation. */ | |
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas, | |
Node writer, | |
List<DatanodeStorageInfo> chosenStorage, | |
boolean returnChosenNodes, | |
Set<Node> excludedNodes, | |
long blocksize, | |
final BlockStoragePolicy storagePolicy) { | |
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { | |
return DatanodeStorageInfo.EMPTY_ARRAY; | |
} | |
if (excludedNodes == null) { | |
excludedNodes = new HashSet<>(); | |
} | |
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas); | |
numOfReplicas = result[0]; | |
int maxNodesPerRack = result[1]; | |
final List<DatanodeStorageInfo> results = new ArrayList<>(chosenStorage); | |
for (DatanodeStorageInfo storage : chosenStorage) { | |
// add localMachine and related nodes to excludedNodes | |
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes); | |
} | |
boolean avoidStaleNodes = (stats != null | |
&& stats.isAvoidingStaleDataNodesForWrite()); | |
final Node localNode = chooseTarget(numOfReplicas, writer, excludedNodes, | |
blocksize, maxNodesPerRack, results, avoidStaleNodes, storagePolicy, | |
EnumSet.noneOf(StorageType.class), results.isEmpty()); | |
if (!returnChosenNodes) { | |
results.removeAll(chosenStorage); | |
} | |
// sorting nodes to form a pipeline | |
return getPipeline( | |
(writer != null && writer instanceof DatanodeDescriptor) ? writer | |
: localNode, | |
results.toArray(new DatanodeStorageInfo[results.size()])); | |
} | |
/** | |
* Calculate the maximum number of replicas to allocate per rack. It also | |
* limits the total number of replicas to the total number of nodes in the | |
* cluster. Caller should adjust the replica count to the return value. | |
* | |
* @param numOfChosen The number of already chosen nodes. | |
* @param numOfReplicas The number of additional nodes to allocate. | |
* @return integer array. Index 0: The number of nodes allowed to allocate | |
* in addition to already chosen nodes. | |
* Index 1: The maximum allowed number of nodes per rack. This | |
* is independent of the number of chosen nodes, as it is calculated | |
* using the target number of replicas. | |
*/ | |
protected int[] getMaxNodesPerRack(int numOfChosen, int numOfReplicas) { | |
int clusterSize = clusterMap.getNumOfLeaves(); | |
int totalNumOfReplicas = numOfChosen + numOfReplicas; | |
if (totalNumOfReplicas > clusterSize) { | |
numOfReplicas -= (totalNumOfReplicas-clusterSize); | |
totalNumOfReplicas = clusterSize; | |
} | |
// No calculation needed when there is only one rack or picking one node. | |
int numOfRacks = clusterMap.getNumOfRacks(); | |
if (numOfRacks == 1 || totalNumOfReplicas <= 1) { | |
return new int[] {numOfReplicas, totalNumOfReplicas}; | |
} | |
int maxNodesPerRack = (totalNumOfReplicas-1)/numOfRacks + 2; | |
// At this point, there are more than one racks and more than one replicas | |
// to store. Avoid all replicas being in the same rack. | |
// | |
// maxNodesPerRack has the following properties at this stage. | |
// 1) maxNodesPerRack >= 2 | |
// 2) (maxNodesPerRack-1) * numOfRacks > totalNumOfReplicas | |
// when numOfRacks > 1 | |
// | |
// Thus, the following adjustment will still result in a value that forces | |
// multi-rack allocation and gives enough number of total nodes. | |
if (maxNodesPerRack == totalNumOfReplicas) { | |
maxNodesPerRack--; | |
} | |
return new int[] {numOfReplicas, maxNodesPerRack}; | |
} | |
private EnumMap<StorageType, Integer> getRequiredStorageTypes( | |
List<StorageType> types) { | |
EnumMap<StorageType, Integer> map = new EnumMap<>(StorageType.class); | |
for (StorageType type : types) { | |
if (!map.containsKey(type)) { | |
map.put(type, 1); | |
} else { | |
int num = map.get(type); | |
map.put(type, num + 1); | |
} | |
} | |
return map; | |
} | |
/** | |
* choose <i>numOfReplicas</i> from all data nodes | |
* @param numOfReplicas additional number of replicas wanted | |
* @param writer the writer's machine, could be a non-DatanodeDescriptor node | |
* @param excludedNodes datanodes that should not be considered as targets | |
* @param blocksize size of the data to be written | |
* @param maxNodesPerRack max nodes allowed per rack | |
* @param results the target nodes already chosen | |
* @param avoidStaleNodes avoid stale nodes in replica choosing | |
* @return local node of writer (not chosen node) | |
*/ | |
private Node chooseTarget(int numOfReplicas, | |
Node writer, | |
final Set<Node> excludedNodes, | |
final long blocksize, | |
final int maxNodesPerRack, | |
final List<DatanodeStorageInfo> results, | |
final boolean avoidStaleNodes, | |
final BlockStoragePolicy storagePolicy, | |
final EnumSet<StorageType> unavailableStorages, | |
final boolean newBlock) { | |
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) { | |
return (writer instanceof DatanodeDescriptor) ? writer : null; | |
} | |
final int numOfResults = results.size(); | |
final int totalReplicasExpected = numOfReplicas + numOfResults; | |
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) { | |
writer = results.get(0).getDatanodeDescriptor(); | |
} | |
// Keep a copy of original excludedNodes | |
final Set<Node> oldExcludedNodes = new HashSet<>(excludedNodes); | |
// choose storage types; use fallbacks for unavailable storages | |
final List<StorageType> requiredStorageTypes = storagePolicy | |
.chooseStorageTypes((short) totalReplicasExpected, | |
DatanodeStorageInfo.toStorageTypes(results), | |
unavailableStorages, newBlock); | |
final EnumMap<StorageType, Integer> storageTypes = | |
getRequiredStorageTypes(requiredStorageTypes); | |
if (LOG.isTraceEnabled()) { | |
LOG.trace("storageTypes=" + storageTypes); | |
} | |
try { | |
if ((numOfReplicas = requiredStorageTypes.size()) == 0) { | |
throw new NotEnoughReplicasException( | |
"All required storage types are unavailable: " | |
+ " unavailableStorages=" + unavailableStorages | |
+ ", storagePolicy=" + storagePolicy); | |
} | |
writer = chooseTargetInOrder(numOfReplicas, writer, excludedNodes, blocksize, | |
maxNodesPerRack, results, avoidStaleNodes, newBlock, storageTypes); | |
} catch (NotEnoughReplicasException e) { | |
final String message = "Failed to place enough replicas, still in need of " | |
+ (totalReplicasExpected - results.size()) + " to reach " | |
+ totalReplicasExpected | |
+ " (unavailableStorages=" + unavailableStorages | |
+ ", storagePolicy=" + storagePolicy | |
+ ", newBlock=" + newBlock + ")"; | |
if (LOG.isTraceEnabled()) { | |
LOG.trace(message, e); | |
} else { | |
LOG.warn(message + " " + e.getMessage()); | |
} | |
if (avoidStaleNodes) { | |
// Retry chooseTarget again, this time not avoiding stale nodes. | |
// excludedNodes contains the initial excludedNodes and nodes that were | |
// not chosen because they were stale, decommissioned, etc. | |
// We need to additionally exclude the nodes that were added to the | |
// result list in the successful calls to choose*() above. | |
for (DatanodeStorageInfo resultStorage : results) { | |
addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes); | |
} | |
// Set numOfReplicas, since it can get out of sync with the result list | |
// if the NotEnoughReplicasException was thrown in chooseRandom(). | |
numOfReplicas = totalReplicasExpected - results.size(); | |
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, | |
maxNodesPerRack, results, false, storagePolicy, unavailableStorages, | |
newBlock); | |
} | |
boolean retry = false; | |
// simply add all the remaining types into unavailableStorages and give | |
// another try. No best effort is guaranteed here. | |
for (StorageType type : storageTypes.keySet()) { | |
if (!unavailableStorages.contains(type)) { | |
unavailableStorages.add(type); | |
retry = true; | |
} | |
} | |
if (retry) { | |
for (DatanodeStorageInfo resultStorage : results) { | |
addToExcludedNodes(resultStorage.getDatanodeDescriptor(), | |
oldExcludedNodes); | |
} | |
numOfReplicas = totalReplicasExpected - results.size(); | |
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, | |
maxNodesPerRack, results, false, storagePolicy, unavailableStorages, | |
newBlock); | |
} | |
} | |
return writer; | |
} | |
protected Node chooseTargetInOrder(int numOfReplicas, | |
Node writer, | |
final Set<Node> excludedNodes, | |
final long blocksize, | |
final int maxNodesPerRack, | |
final List<DatanodeStorageInfo> results, | |
final boolean avoidStaleNodes, | |
final boolean newBlock, | |
EnumMap<StorageType, Integer> storageTypes) | |
throws NotEnoughReplicasException { | |
final int numOfResults = results.size(); | |
if (numOfResults == 0) { | |
writer = chooseLocalStorage(writer, excludedNodes, blocksize, | |
maxNodesPerRack, results, avoidStaleNodes, storageTypes, true) | |
.getDatanodeDescriptor(); | |
if (--numOfReplicas == 0) { | |
return writer; | |
} | |
} | |
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor(); | |
if (numOfResults <= 1) { | |
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, | |
results, avoidStaleNodes, storageTypes); | |
if (--numOfReplicas == 0) { | |
return writer; | |
} | |
} | |
if (numOfResults <= 2) { | |
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor(); | |
if (clusterMap.isOnSameRack(dn0, dn1)) { | |
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack, | |
results, avoidStaleNodes, storageTypes); | |
} else if (newBlock){ | |
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack, | |
results, avoidStaleNodes, storageTypes); | |
} else { | |
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, | |
results, avoidStaleNodes, storageTypes); | |
} | |
if (--numOfReplicas == 0) { | |
return writer; | |
} | |
} | |
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, | |
maxNodesPerRack, results, avoidStaleNodes, storageTypes); | |
return writer; | |
} | |
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, | |
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, | |
List<DatanodeStorageInfo> results, boolean avoidStaleNodes, | |
EnumMap<StorageType, Integer> storageTypes) | |
throws NotEnoughReplicasException { | |
// if no local machine, randomly choose one node | |
if (localMachine == null) { | |
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, | |
maxNodesPerRack, results, avoidStaleNodes, storageTypes); | |
} | |
if (preferLocalNode && localMachine instanceof DatanodeDescriptor | |
&& clusterMap.contains(localMachine)) { | |
DatanodeDescriptor localDatanode = (DatanodeDescriptor) localMachine; | |
// otherwise try local machine first | |
if (excludedNodes.add(localMachine) // was not in the excluded list | |
&& isGoodDatanode(localDatanode, maxNodesPerRack, false, | |
results, avoidStaleNodes)) { | |
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes | |
.entrySet().iterator(); iter.hasNext(); ) { | |
Map.Entry<StorageType, Integer> entry = iter.next(); | |
DatanodeStorageInfo localStorage = chooseStorage4Block( | |
localDatanode, blocksize, results, entry.getKey()); | |
if (localStorage != null) { | |
// add node and related nodes to excludedNode | |
addToExcludedNodes(localDatanode, excludedNodes); | |
int num = entry.getValue(); | |
if (num == 1) { | |
iter.remove(); | |
} else { | |
entry.setValue(num - 1); | |
} | |
return localStorage; | |
} | |
} | |
} | |
} | |
return null; | |
} | |
/** | |
* Choose <i>localMachine</i> as the target. | |
* if <i>localMachine</i> is not available, | |
* choose a node on the same rack | |
* @return the chosen storage | |
*/ | |
protected DatanodeStorageInfo chooseLocalStorage(Node localMachine, | |
Set<Node> excludedNodes, long blocksize, int maxNodesPerRack, | |
List<DatanodeStorageInfo> results, boolean avoidStaleNodes, | |
EnumMap<StorageType, Integer> storageTypes, boolean fallbackToLocalRack) | |
throws NotEnoughReplicasException { | |
DatanodeStorageInfo localStorage = chooseLocalStorage(localMachine, | |
excludedNodes, blocksize, maxNodesPerRack, results, | |
avoidStaleNodes, storageTypes); | |
if (localStorage != null) { | |
return localStorage; | |
} | |
if (!fallbackToLocalRack) { | |
return null; | |
} | |
// try a node on local rack | |
return chooseLocalRack(localMachine, excludedNodes, blocksize, | |
maxNodesPerRack, results, avoidStaleNodes, storageTypes); | |
} | |
/** | |
* Add <i>localMachine</i> and related nodes to <i>excludedNodes</i> | |
* for next replica choosing. In sub class, we can add more nodes within | |
* the same failure domain of localMachine | |
* @return number of new excluded nodes | |
*/ | |
protected int addToExcludedNodes(DatanodeDescriptor localMachine, | |
Set<Node> excludedNodes) { | |
return excludedNodes.add(localMachine) ? 1 : 0; | |
} | |
/** | |
* Choose one node from the rack that <i>localMachine</i> is on. | |
* if no such node is available, choose one node from the rack where | |
* a second replica is on. | |
* if still no such node is available, choose a random node | |
* in the cluster. | |
* @return the chosen node | |
*/ | |
protected DatanodeStorageInfo chooseLocalRack(Node localMachine, | |
Set<Node> excludedNodes, | |
long blocksize, | |
int maxNodesPerRack, | |
List<DatanodeStorageInfo> results, | |
boolean avoidStaleNodes, | |
EnumMap<StorageType, Integer> storageTypes) | |
throws NotEnoughReplicasException { | |
// no local machine, so choose a random machine | |
if (localMachine == null) { | |
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, | |
maxNodesPerRack, results, avoidStaleNodes, storageTypes); | |
} | |
final String localRack = localMachine.getNetworkLocation(); | |
try { | |
// choose one from the local rack | |
return chooseRandom(localRack, excludedNodes, | |
blocksize, maxNodesPerRack, results, avoidStaleNodes, storageTypes); | |
} catch (NotEnoughReplicasException e) { | |
// find the next replica and retry with its rack | |
for(DatanodeStorageInfo resultStorage : results) { | |
DatanodeDescriptor nextNode = resultStorage.getDatanodeDescriptor(); | |
if (nextNode != localMachine) { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Failed to choose from local rack (location = " + localRack | |
+ "), retry with the rack of the next replica (location = " | |
+ nextNode.getNetworkLocation() + ")", e); | |
} | |
return chooseFromNextRack(nextNode, excludedNodes, blocksize, | |
maxNodesPerRack, results, avoidStaleNodes, storageTypes); | |
} | |
} | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Failed to choose from local rack (location = " + localRack | |
+ "); the second replica is not found, retry choosing ramdomly", e); | |
} | |
//the second replica is not found, randomly choose one from the network | |
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, | |
maxNodesPerRack, results, avoidStaleNodes, storageTypes); | |
} | |
} | |
private DatanodeStorageInfo chooseFromNextRack(Node next, | |
Set<Node> excludedNodes, | |
long blocksize, | |
int maxNodesPerRack, | |
List<DatanodeStorageInfo> results, | |
boolean avoidStaleNodes, | |
EnumMap<StorageType, Integer> storageTypes) throws NotEnoughReplicasException { | |
final String nextRack = next.getNetworkLocation(); | |
try { | |
return chooseRandom(nextRack, excludedNodes, blocksize, maxNodesPerRack, | |
results, avoidStaleNodes, storageTypes); | |
} catch(NotEnoughReplicasException e) { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Failed to choose from the next rack (location = " + nextRack | |
+ "), retry choosing ramdomly", e); | |
} | |
//otherwise randomly choose one from the network | |
return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize, | |
maxNodesPerRack, results, avoidStaleNodes, storageTypes); | |
} | |
} | |
/** | |
* Choose <i>numOfReplicas</i> nodes from the racks | |
* that <i>localMachine</i> is NOT on. | |
* if not enough nodes are available, choose the remaining ones | |
* from the local rack | |
*/ | |
protected void chooseRemoteRack(int numOfReplicas, | |
DatanodeDescriptor localMachine, | |
Set<Node> excludedNodes, | |
long blocksize, | |
int maxReplicasPerRack, | |
List<DatanodeStorageInfo> results, | |
boolean avoidStaleNodes, | |
EnumMap<StorageType, Integer> storageTypes) | |
throws NotEnoughReplicasException { | |
int oldNumOfReplicas = results.size(); | |
// randomly choose one node from remote racks | |
try { | |
chooseRandom(numOfReplicas, "~" + localMachine.getNetworkLocation(), | |
excludedNodes, blocksize, maxReplicasPerRack, results, | |
avoidStaleNodes, storageTypes); | |
} catch (NotEnoughReplicasException e) { | |
if (LOG.isDebugEnabled()) { | |
LOG.debug("Failed to choose remote rack (location = ~" | |
+ localMachine.getNetworkLocation() + "), fallback to local rack", e); | |
} | |
chooseRandom(numOfReplicas-(results.size()-oldNumOfReplicas), | |
localMachine.getNetworkLocation(), excludedNodes, blocksize, | |
maxReplicasPerRack, results, avoidStaleNodes, storageTypes); | |
} | |
} | |
/** | |
* Randomly choose one target from the given <i>scope</i>. | |
* @return the chosen storage, if there is any. | |
*/ | |
protected DatanodeStorageInfo chooseRandom(String scope, | |
Set<Node> excludedNodes, | |
long blocksize, | |
int maxNodesPerRack, | |
List<DatanodeStorageInfo> results, | |
boolean avoidStaleNodes, | |
EnumMap<StorageType, Integer> storageTypes) | |
throws NotEnoughReplicasException { | |
return chooseRandom(1, scope, excludedNodes, blocksize, maxNodesPerRack, | |
results, avoidStaleNodes, storageTypes); | |
} | |
/** | |
* Randomly choose <i>numOfReplicas</i> targets from the given <i>scope</i>. | |
* @return the first chosen node, if there is any. | |
*/ | |
protected DatanodeStorageInfo chooseRandom(int numOfReplicas, | |
String scope, | |
Set<Node> excludedNodes, | |
long blocksize, | |
int maxNodesPerRack, | |
List<DatanodeStorageInfo> results, | |
boolean avoidStaleNodes, | |
EnumMap<StorageType, Integer> storageTypes) | |
throws NotEnoughReplicasException { | |
int numOfAvailableNodes = clusterMap.countNumOfAvailableNodes( | |
scope, excludedNodes); | |
StringBuilder builder = null; | |
if (LOG.isDebugEnabled()) { | |
builder = debugLoggingBuilder.get(); | |
builder.setLength(0); | |
builder.append("["); | |
} | |
boolean badTarget = false; | |
DatanodeStorageInfo firstChosen = null; | |
while(numOfReplicas > 0 && numOfAvailableNodes > 0) { | |
DatanodeDescriptor chosenNode = chooseDataNode(scope); | |
if (excludedNodes.add(chosenNode)) { //was not in the excluded list | |
if (LOG.isDebugEnabled()) { | |
builder.append("\nNode ").append(NodeBase.getPath(chosenNode)).append(" ["); | |
} | |
numOfAvailableNodes--; | |
DatanodeStorageInfo storage = null; | |
if (isGoodDatanode(chosenNode, maxNodesPerRack, considerLoad, | |
results, avoidStaleNodes)) { | |
for (Iterator<Map.Entry<StorageType, Integer>> iter = storageTypes | |
.entrySet().iterator(); iter.hasNext(); ) { | |
Map.Entry<StorageType, Integer> entry = iter.next(); | |
storage = chooseStorage4Block( | |
chosenNode, blocksize, results, entry.getKey()); | |
if (storage != null) { | |
numOfReplicas--; | |
if (firstChosen == null) { | |
firstChosen = storage; | |
} | |
// add node and related nodes to excludedNode | |
numOfAvailableNodes -= | |
addToExcludedNodes(chosenNode, excludedNodes); | |
int num = entry.getValue(); | |
if (num == 1) { | |
iter.remove(); | |
} else { | |
entry.setValue(num - 1); | |
} | |
break; | |
} | |
} | |
} | |
if (LOG.isDebugEnabled()) { | |
builder.append("\n]"); | |
} | |
// If no candidate storage was found on this DN then set badTarget. | |
badTarget = (storage == null); | |
} | |
} | |
if (numOfReplicas>0) { | |
String detail = enableDebugLogging; | |
if (LOG.isDebugEnabled()) { | |
if (badTarget && builder != null) { | |
detail = builder.toString(); | |
builder.setLength(0); | |
} else { | |
detail = ""; | |
} | |
} | |
throw new NotEnoughReplicasException(detail); | |
} | |
return firstChosen; | |
} | |
/** | |
* Choose a datanode from the given <i>scope</i>. | |
* @return the chosen node, if there is any. | |
*/ | |
protected DatanodeDescriptor chooseDataNode(final String scope) { | |
return (DatanodeDescriptor) clusterMap.chooseRandom(scope); | |
} | |
/** | |
* Choose a good storage of given storage type from datanode, and add it to | |
* the result list. | |
* | |
* @param dnd datanode descriptor | |
* @param blockSize requested block size | |
* @param results the result storages | |
* @param storageType requested storage type | |
* @return the chosen datanode storage | |
*/ | |
DatanodeStorageInfo chooseStorage4Block(DatanodeDescriptor dnd, | |
long blockSize, | |
List<DatanodeStorageInfo> results, | |
StorageType storageType) { | |
DatanodeStorageInfo storage = | |
dnd.chooseStorage4Block(storageType, blockSize); | |
if (storage != null) { | |
results.add(storage); | |
} else { | |
logNodeIsNotChosen(dnd, "no good storage to place the block "); | |
} | |
return storage; | |
} | |
private static void logNodeIsNotChosen(DatanodeDescriptor node, | |
String reason) { | |
if (LOG.isDebugEnabled()) { | |
// build the error message for later use. | |
debugLoggingBuilder.get() | |
.append("\n Datanode ").append(node) | |
.append(" is not chosen since ").append(reason).append("."); | |
} | |
} | |
/** | |
* Determine if a datanode is good for placing block. | |
* | |
* @param node The target datanode | |
* @param maxTargetPerRack Maximum number of targets per rack. The value of | |
* this parameter depends on the number of racks in | |
* the cluster and total number of replicas for a block | |
* @param considerLoad whether or not to consider load of the target node | |
* @param results A list containing currently chosen nodes. Used to check if | |
* too many nodes has been chosen in the target rack. | |
* @param avoidStaleNodes Whether or not to avoid choosing stale nodes | |
* @return Reture true if the datanode is good candidate, otherwise false | |
*/ | |
boolean isGoodDatanode(DatanodeDescriptor node, | |
int maxTargetPerRack, boolean considerLoad, | |
List<DatanodeStorageInfo> results, | |
boolean avoidStaleNodes) { | |
// check if the node is (being) decommissioned | |
if (node.isDecommissionInProgress() || node.isDecommissioned()) { | |
logNodeIsNotChosen(node, "the node is (being) decommissioned "); | |
return false; | |
} | |
if (avoidStaleNodes) { | |
if (node.isStale(this.staleInterval)) { | |
logNodeIsNotChosen(node, "the node is stale "); | |
return false; | |
} | |
} | |
// check the communication traffic of the target machine | |
if (considerLoad) { | |
final double maxLoad = 2.0 * stats.getInServiceXceiverAverage(); | |
final int nodeLoad = node.getXceiverCount(); | |
if (nodeLoad > maxLoad) { | |
logNodeIsNotChosen(node, "the node is too busy (load: " + nodeLoad | |
+ " > " + maxLoad + ") "); | |
return false; | |
} | |
} | |
// check if the target rack has chosen too many nodes | |
String rackname = node.getNetworkLocation(); | |
int counter=1; | |
for(DatanodeStorageInfo resultStorage : results) { | |
if (rackname.equals( | |
resultStorage.getDatanodeDescriptor().getNetworkLocation())) { | |
counter++; | |
} | |
} | |
if (counter > maxTargetPerRack) { | |
logNodeIsNotChosen(node, "the rack has too many chosen nodes "); | |
return false; | |
} | |
return true; | |
} | |
/** | |
* Return a pipeline of nodes. | |
* The pipeline is formed finding a shortest path that | |
* starts from the writer and traverses all <i>nodes</i> | |
* This is basically a traveling salesman problem. | |
*/ | |
private DatanodeStorageInfo[] getPipeline(Node writer, | |
DatanodeStorageInfo[] storages) { | |
if (storages.length == 0) { | |
return storages; | |
} | |
synchronized(clusterMap) { | |
int index=0; | |
if (writer == null || !clusterMap.contains(writer)) { | |
writer = storages[0].getDatanodeDescriptor(); | |
} | |
for(; index < storages.length; index++) { | |
DatanodeStorageInfo shortestStorage = storages[index]; | |
int shortestDistance = clusterMap.getDistance(writer, | |
shortestStorage.getDatanodeDescriptor()); | |
int shortestIndex = index; | |
for(int i = index + 1; i < storages.length; i++) { | |
int currentDistance = clusterMap.getDistance(writer, | |
storages[i].getDatanodeDescriptor()); | |
if (shortestDistance>currentDistance) { | |
shortestDistance = currentDistance; | |
shortestStorage = storages[i]; | |
shortestIndex = i; | |
} | |
} | |
//switch position index & shortestIndex | |
if (index != shortestIndex) { | |
storages[shortestIndex] = storages[index]; | |
storages[index] = shortestStorage; | |
} | |
writer = shortestStorage.getDatanodeDescriptor(); | |
} | |
} | |
return storages; | |
} | |
@Override | |
public BlockPlacementStatus verifyBlockPlacement(DatanodeInfo[] locs, | |
int numberOfReplicas) { | |
if (locs == null) | |
locs = DatanodeDescriptor.EMPTY_ARRAY; | |
if (!clusterMap.hasClusterEverBeenMultiRack()) { | |
// only one rack | |
return new BlockPlacementStatusDefault(1, 1); | |
} | |
int minRacks = 2; | |
minRacks = Math.min(minRacks, numberOfReplicas); | |
// 1. Check that all locations are different. | |
// 2. Count locations on different racks. | |
Set<String> racks = new TreeSet<String>(); | |
for (DatanodeInfo dn : locs) | |
racks.add(dn.getNetworkLocation()); | |
return new BlockPlacementStatusDefault(racks.size(), minRacks); | |
} | |
/** | |
* Decide whether deleting the specified replica of the block still makes | |
* the block conform to the configured block placement policy. | |
* @param replicationFactor The required number of replicas for this block | |
* @param moreThanone The replica locations of this block that are present | |
* on more than one unique racks. | |
* @param exactlyOne Replica locations of this block that are present | |
* on exactly one unique racks. | |
* @param excessTypes The excess {@link StorageType}s according to the | |
* {@link BlockStoragePolicy}. | |
* | |
* @return the replica that is the best candidate for deletion | |
*/ | |
@VisibleForTesting | |
public DatanodeStorageInfo chooseReplicaToDelete(short replicationFactor, | |
Collection<DatanodeStorageInfo> moreThanone, Collection<DatanodeStorageInfo> exactlyOne, | |
final List<StorageType> excessTypes) { | |
long oldestHeartbeat = | |
monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier; | |
DatanodeStorageInfo oldestHeartbeatStorage = null; | |
long minSpace = Long.MAX_VALUE; | |
DatanodeStorageInfo minSpaceStorage = null; | |
// Pick the node with the oldest heartbeat or with the least free space, | |
// if all hearbeats are within the tolerable heartbeat interval | |
for(DatanodeStorageInfo storage : pickupReplicaSet(moreThanone, exactlyOne)) { | |
if (!excessTypes.contains(storage.getStorageType())) { | |
continue; | |
} | |
final DatanodeDescriptor node = storage.getDatanodeDescriptor(); | |
long free = node.getRemaining(); | |
long lastHeartbeat = node.getLastUpdateMonotonic(); | |
if (lastHeartbeat < oldestHeartbeat) { | |
oldestHeartbeat = lastHeartbeat; | |
oldestHeartbeatStorage = storage; | |
} | |
if (minSpace > free) { | |
minSpace = free; | |
minSpaceStorage = storage; | |
} | |
} | |
final DatanodeStorageInfo storage; | |
if (oldestHeartbeatStorage != null) { | |
storage = oldestHeartbeatStorage; | |
} else if (minSpaceStorage != null) { | |
storage = minSpaceStorage; | |
} else { | |
return null; | |
} | |
excessTypes.remove(storage.getStorageType()); | |
return storage; | |
} | |
@Override | |
public List<DatanodeStorageInfo> chooseReplicasToDelete( | |
Collection<DatanodeStorageInfo> candidates, | |
int expectedNumOfReplicas, | |
List<StorageType> excessTypes, | |
DatanodeDescriptor addedNode, | |
DatanodeDescriptor delNodeHint) { | |
List<DatanodeStorageInfo> excessReplicas = new ArrayList<>(); | |
final Map<String, List<DatanodeStorageInfo>> rackMap = new HashMap<>(); | |
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<>(); | |
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<>(); | |
// split nodes into two sets | |
// moreThanOne contains nodes on rack with more than one replica | |
// exactlyOne contains the remaining nodes | |
splitNodesWithRack(candidates, rackMap, moreThanOne, exactlyOne); | |
// pick one node to delete that favors the delete hint | |
// otherwise pick one with least space from priSet if it is not empty | |
// otherwise one node with least space from remains | |
boolean firstOne = true; | |
final DatanodeStorageInfo delNodeHintStorage = | |
DatanodeStorageInfo.getDatanodeStorageInfo(candidates, delNodeHint); | |
final DatanodeStorageInfo addedNodeStorage = | |
DatanodeStorageInfo.getDatanodeStorageInfo(candidates, addedNode); | |
while (candidates.size() - expectedNumOfReplicas > excessReplicas.size()) { | |
final DatanodeStorageInfo cur; | |
if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage, | |
moreThanOne, excessTypes)) { | |
cur = delNodeHintStorage; | |
} else { // regular excessive replica removal | |
cur = | |
chooseReplicaToDelete((short) expectedNumOfReplicas, moreThanOne, exactlyOne, | |
excessTypes); | |
} | |
firstOne = false; | |
if (cur == null) { | |
LOG.warn("No excess replica can be found. excessTypes: {}." + | |
" moreThanOne: {}. exactlyOne: {}.", excessTypes, moreThanOne, | |
exactlyOne); | |
break; | |
} | |
// adjust rackmap, moreThanOne, and exactlyOne | |
adjustSetsWithChosenReplica(rackMap, moreThanOne, exactlyOne, cur); | |
excessReplicas.add(cur); | |
} | |
return excessReplicas; | |
} | |
/** Check if we can use delHint. */ | |
@VisibleForTesting | |
static boolean useDelHint(boolean isFirst, DatanodeStorageInfo delHint, | |
DatanodeStorageInfo added, List<DatanodeStorageInfo> moreThan1Racks, | |
List<StorageType> excessTypes) { | |
if (!isFirst) { | |
return false; // only consider delHint for the first case | |
} else if (delHint == null) { | |
return false; // no delHint | |
} else if (!excessTypes.contains(delHint.getStorageType())) { | |
return false; // delHint storage type is not an excess type | |
} else { | |
// check if removing delHint reduces the number of racks | |
if (moreThan1Racks.contains(delHint)) { | |
return true; // delHint and some other nodes are under the same rack | |
} else if (added != null && !moreThan1Racks.contains(added)) { | |
return true; // the added node adds a new rack | |
} | |
return false; // removing delHint reduces the number of racks; | |
} | |
} | |
/** | |
* Pick up replica node set for deleting replica as over-replicated. | |
* First set contains replica nodes on rack with more than one | |
* replica while second set contains remaining replica nodes. | |
* So pick up first set if not empty. If first is empty, then pick second. | |
*/ | |
protected Collection<DatanodeStorageInfo> pickupReplicaSet( | |
Collection<DatanodeStorageInfo> moreThanOne, | |
Collection<DatanodeStorageInfo> exactlyOne) { | |
return moreThanOne.isEmpty() ? exactlyOne : moreThanOne; | |
} | |
@VisibleForTesting | |
void setPreferLocalNode(boolean prefer) { | |
this.preferLocalNode = prefer; | |
} | |
} | |
HDFS-8863: ディスク使用率が高い時は注意。SSDが先に埋まるときとか普通に起こりそう。
Fixed in 2.6.1/2.7.2/HDP2.3.2
HDFS-8946とHDFS-8884で、Blockを配置するDataNodeを探索するアルゴリズムが洗練されて、探索回数が減っている。
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
結論を先に言うと、Rack awarenessよりもBlock placement policyが優先される。
一般に、DataNode3台を決めるときには、ローカル、別ラック、2台のうちどちらかと同一ラック、というように選択する(L411以降を参照)が、それぞれのポリシーが以下のようにfallbackしていく。最終的にはラック関係なくDataNodeを全探索する。
全探索について:
L670-702あたりを見ると、scopeの範囲内で、必要なStorageTypeを持ち、残容量も余っているDataNodeを選ぶようになっている。scopeの範囲は先に述べたfallbackにより変わっていく。
全探索しても必要なStorage Typeが見つからなかった場合は、fallbackする。
具体的には、L345以降が実行される。見つからなかったStorageTypeは、unavailableStoragesに登録されて、L302のchooseTarget自体が再実行される。unavailableStoragesは、L325, L326のfallbackのときに利用される。