Created
February 16, 2014 03:47
-
-
Save anonymous/9028934 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- tmp3/clouderahadoop-0.20.2-CDH3u2-src/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java 2011-10-14 01:39:59.000000000 -0700 | |
+++ clouderahadoop-0.20.2-CDH3u2-src/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java 2014-02-15 19:43:12.000000000 -0800 | |
@@ -78,6 +78,13 @@ | |
Random r = new Random(); | |
final String clientName; | |
final LeaseChecker leasechecker = new LeaseChecker(); | |
+ | |
+ final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes1 = | |
+ new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>(); | |
+ | |
+ final ConcurrentHashMap<DatanodeInfo, Integer> dampCounter = | |
+ new ConcurrentHashMap<DatanodeInfo, Integer>(); | |
+ | |
private Configuration conf; | |
private long defaultBlockSize; | |
private short defaultReplication; | |
@@ -1062,11 +1069,50 @@ | |
* Entries in <i>nodes</i> are already in the priority order | |
*/ | |
private DatanodeInfo bestNode(DatanodeInfo nodes[], | |
- AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes) | |
+ AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes, LocatedBlock block) | |
throws IOException { | |
if (nodes != null) { | |
+ | |
+ LOG.warn("Total nodes to consider " + nodes.length); | |
+ LOG.warn("Inside bestNode Size of deadNodes map: " + deadNodes.size()); | |
+ | |
+ // print full deadnodes map: | |
+ | |
+ for (AbstractMap.Entry<DatanodeInfo, DatanodeInfo> entry : deadNodes.entrySet()) { | |
+ //LOG.warn("Print All DeadNodes:: " + entry.getKey().getName()); | |
+ } | |
+ | |
+ | |
+ | |
for (int i = 0; i < nodes.length; i++) { | |
+ | |
+ LOG.warn("Considering Node:: " + nodes[i].getName()); | |
+ | |
+ if (deadNodes.containsKey(nodes[i])) { | |
+ if( dampCounter.get(nodes[i]) < 10 ) { | |
+ | |
+ int currentCount = dampCounter.get(nodes[i]); | |
+ currentCount ++; | |
+ dampCounter.put(nodes[i],currentCount); | |
+ | |
+ LOG.warn("Increment fetch attempt from deadListed datanode " + nodes[i].getName() + " :: " + dampCounter.get(nodes[i])); | |
+ // not goint to mess with caching inside InputStream, just setup dampening counter | |
+ } | |
+ | |
+ else { | |
+ | |
+ LOG.warn("Remove Node from deadNodes:: " + nodes[i].getName() + " at counter " + dampCounter); | |
+ deadNodes.remove(nodes[i]); | |
+ dampCounter.put(nodes[i],0); | |
+ | |
+ | |
+ } | |
+ } | |
+ | |
if (!deadNodes.containsKey(nodes[i])) { | |
+ | |
+ LOG.warn("Found bestNode:: " + nodes[i].getName()); | |
+ | |
return nodes[i]; | |
} | |
} | |
@@ -1580,8 +1626,22 @@ | |
private byte[] oneByteBuf = new byte[1]; // used for 'int read()' | |
- void addToDeadNodes(DatanodeInfo dnInfo) { | |
- deadNodes.put(dnInfo, dnInfo); | |
+ synchronized void addToDeadNodes(DatanodeInfo dnInfo) { | |
+ deadNodes1.put(dnInfo, dnInfo); | |
+ | |
+ dampCounter.putIfAbsent(dnInfo, 1); | |
+ | |
+ String hostName1 = dnInfo.getHostName(); | |
+ LOG.warn("Adding to deadNodes " + hostName1); | |
+ // print deadNodes map | |
+ LOG.warn("Inside addToDeadNodes Size of deadNodes map: " + deadNodes1.size()); | |
+ | |
+ // print full deadnodes map: | |
+ | |
+ for (ConcurrentHashMap.Entry<DatanodeInfo, DatanodeInfo> entry : deadNodes1.entrySet()) { | |
+ LOG.warn("Inside addToDeadNodes Print All DeadNodes:: " + entry.getKey().getName()); | |
+ } | |
+ | |
} | |
DFSInputStream(String src, int buffersize, boolean verifyChecksum | |
@@ -1608,6 +1668,7 @@ | |
!newInfo.isUnderConstruction()) { | |
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); | |
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); | |
+ | |
while (oldIter.hasNext() && newIter.hasNext()) { | |
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { | |
throw new IOException("Blocklist for " + src + " has changed!"); | |
@@ -1648,6 +1709,7 @@ | |
} | |
} | |
} | |
+ | |
this.locatedBlocks = newInfo; | |
this.currentNode = null; | |
} | |
@@ -1696,6 +1758,9 @@ | |
// fetch more blocks | |
LocatedBlocks newBlocks; | |
newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize); | |
+ | |
+ LOG.warn("getBlockAt Block is not cached, fetch from namenode: " + targetBlockIdx); | |
+ | |
assert (newBlocks != null) : "Could not find target position " + offset; | |
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); | |
} | |
@@ -1714,6 +1779,7 @@ | |
int targetBlockIdx = locatedBlocks.findBlock(offset); | |
if (targetBlockIdx < 0) { // block is not cached | |
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); | |
+ LOG.warn("fetchBlockAt Block is not cached, fetch from namenode: " + targetBlockIdx); | |
} | |
// fetch blocks | |
LocatedBlocks newBlocks; | |
@@ -1742,6 +1808,7 @@ | |
int blockIdx = locatedBlocks.findBlock(offset); | |
if (blockIdx < 0) { // block is not cached | |
blockIdx = LocatedBlocks.getInsertIndex(blockIdx); | |
+ LOG.warn("getBlockRange Block is not cached, fetch from namenode: " + blockIdx); | |
} | |
long remaining = length; | |
long curOff = offset; | |
@@ -1978,14 +2045,20 @@ | |
} | |
+ /* block is coming from locatedBlocks, which is cached */ | |
private DNAddrPair chooseDataNode(LocatedBlock block) | |
throws IOException { | |
while (true) { | |
DatanodeInfo[] nodes = block.getLocations(); | |
try { | |
- DatanodeInfo chosenNode = bestNode(nodes, deadNodes); | |
+ | |
+ // do not return anything from deadNodes | |
+ DatanodeInfo chosenNode = bestNode(nodes, deadNodes1, block); | |
InetSocketAddress targetAddr = | |
NetUtils.createSocketAddr(chosenNode.getName()); | |
+ | |
+ LOG.info("Datanode available for block: " + chosenNode.getName()); | |
+ | |
return new DNAddrPair(chosenNode, targetAddr); | |
} catch (IOException ie) { | |
String blockInfo = block.getBlock() + " file=" + src; | |
@@ -2003,7 +2076,7 @@ | |
Thread.sleep(3000); | |
} catch (InterruptedException iex) { | |
} | |
- deadNodes.clear(); //2nd option is to remove only nodes[blockId] | |
+ deadNodes1.clear(); //2nd option is to remove only nodes[blockId] | |
openInfo(); | |
block = getBlockAt(block.getStartOffset(), false); | |
failures++; | |
@@ -2066,14 +2139,16 @@ | |
} else { | |
LOG.warn("Failed to connect to " + targetAddr + | |
" for file " + src + | |
- " for block " + block.getBlock().getBlockId() + ":" + | |
+ " for block " + block.getBlock().getBlockId() + ": Will add to deadNodes: " + | |
StringUtils.stringifyException(e)); | |
+ | |
} | |
} finally { | |
IOUtils.closeStream(reader); | |
IOUtils.closeSocket(dn); | |
} | |
// Put chosen node into dead list, continue | |
+ LOG.warn("Adding server to deadNodes, maybe? " + chosenNode.getName()); | |
addToDeadNodes(chosenNode); | |
} | |
} | |
@@ -2192,14 +2267,14 @@ | |
*/ | |
@Override | |
public synchronized boolean seekToNewSource(long targetPos) throws IOException { | |
- boolean markedDead = deadNodes.containsKey(currentNode); | |
+ boolean markedDead = deadNodes1.containsKey(currentNode); | |
addToDeadNodes(currentNode); | |
DatanodeInfo oldNode = currentNode; | |
DatanodeInfo newNode = blockSeekTo(targetPos); | |
if (!markedDead) { | |
/* remove it from deadNodes. blockSeekTo could have cleared | |
* deadNodes and added currentNode again. Thats ok. */ | |
- deadNodes.remove(oldNode); | |
+ deadNodes1.remove(oldNode); | |
} | |
if (!oldNode.getStorageID().equals(newNode.getStorageID())) { | |
currentNode = newNode; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment