Skip to content

Instantly share code, notes, and snippets.

Created February 16, 2014 03:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/9028934 to your computer and use it in GitHub Desktop.
Save anonymous/9028934 to your computer and use it in GitHub Desktop.
--- tmp3/clouderahadoop-0.20.2-CDH3u2-src/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java 2011-10-14 01:39:59.000000000 -0700
+++ clouderahadoop-0.20.2-CDH3u2-src/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java 2014-02-15 19:43:12.000000000 -0800
@@ -78,6 +78,13 @@
Random r = new Random();
final String clientName;
final LeaseChecker leasechecker = new LeaseChecker();
+
+ final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes1 =
+ new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
+
+ final ConcurrentHashMap<DatanodeInfo, Integer> dampCounter =
+ new ConcurrentHashMap<DatanodeInfo, Integer>();
+
private Configuration conf;
private long defaultBlockSize;
private short defaultReplication;
@@ -1062,11 +1069,50 @@
* Entries in <i>nodes</i> are already in the priority order
*/
private DatanodeInfo bestNode(DatanodeInfo nodes[],
- AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes)
+ AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes, LocatedBlock block)
throws IOException {
if (nodes != null) {
+
+ LOG.warn("Total nodes to consider " + nodes.length);
+ LOG.warn("Inside bestNode Size of deadNodes map: " + deadNodes.size());
+
+ // print full deadnodes map:
+
+ for (AbstractMap.Entry<DatanodeInfo, DatanodeInfo> entry : deadNodes.entrySet()) {
+ //LOG.warn("Print All DeadNodes:: " + entry.getKey().getName());
+ }
+
+
+
for (int i = 0; i < nodes.length; i++) {
+
+ LOG.warn("Considering Node:: " + nodes[i].getName());
+
+ if (deadNodes.containsKey(nodes[i])) {
+ if( dampCounter.get(nodes[i]) < 10 ) {
+
+ int currentCount = dampCounter.get(nodes[i]);
+ currentCount ++;
+ dampCounter.put(nodes[i],currentCount);
+
+ LOG.warn("Increment fetch attempt from deadListed datanode " + nodes[i].getName() + " :: " + dampCounter.get(nodes[i]));
+ // not goint to mess with caching inside InputStream, just setup dampening counter
+ }
+
+ else {
+
+ LOG.warn("Remove Node from deadNodes:: " + nodes[i].getName() + " at counter " + dampCounter);
+ deadNodes.remove(nodes[i]);
+ dampCounter.put(nodes[i],0);
+
+
+ }
+ }
+
if (!deadNodes.containsKey(nodes[i])) {
+
+ LOG.warn("Found bestNode:: " + nodes[i].getName());
+
return nodes[i];
}
}
@@ -1580,8 +1626,22 @@
private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
- void addToDeadNodes(DatanodeInfo dnInfo) {
- deadNodes.put(dnInfo, dnInfo);
+ synchronized void addToDeadNodes(DatanodeInfo dnInfo) {
+ deadNodes1.put(dnInfo, dnInfo);
+
+ dampCounter.putIfAbsent(dnInfo, 1);
+
+ String hostName1 = dnInfo.getHostName();
+ LOG.warn("Adding to deadNodes " + hostName1);
+ // print deadNodes map
+ LOG.warn("Inside addToDeadNodes Size of deadNodes map: " + deadNodes1.size());
+
+ // print full deadnodes map:
+
+ for (ConcurrentHashMap.Entry<DatanodeInfo, DatanodeInfo> entry : deadNodes1.entrySet()) {
+ LOG.warn("Inside addToDeadNodes Print All DeadNodes:: " + entry.getKey().getName());
+ }
+
}
DFSInputStream(String src, int buffersize, boolean verifyChecksum
@@ -1608,6 +1668,7 @@
!newInfo.isUnderConstruction()) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
+
while (oldIter.hasNext() && newIter.hasNext()) {
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
@@ -1648,6 +1709,7 @@
}
}
}
+
this.locatedBlocks = newInfo;
this.currentNode = null;
}
@@ -1696,6 +1758,9 @@
// fetch more blocks
LocatedBlocks newBlocks;
newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+
+ LOG.warn("getBlockAt Block is not cached, fetch from namenode: " + targetBlockIdx);
+
assert (newBlocks != null) : "Could not find target position " + offset;
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
}
@@ -1714,6 +1779,7 @@
int targetBlockIdx = locatedBlocks.findBlock(offset);
if (targetBlockIdx < 0) { // block is not cached
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+ LOG.warn("fetchBlockAt Block is not cached, fetch from namenode: " + targetBlockIdx);
}
// fetch blocks
LocatedBlocks newBlocks;
@@ -1742,6 +1808,7 @@
int blockIdx = locatedBlocks.findBlock(offset);
if (blockIdx < 0) { // block is not cached
blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
+ LOG.warn("getBlockRange Block is not cached, fetch from namenode: " + blockIdx);
}
long remaining = length;
long curOff = offset;
@@ -1978,14 +2045,20 @@
}
+ /* block is coming from locatedBlocks, which is cached */
private DNAddrPair chooseDataNode(LocatedBlock block)
throws IOException {
while (true) {
DatanodeInfo[] nodes = block.getLocations();
try {
- DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
+
+ // do not return anything from deadNodes
+ DatanodeInfo chosenNode = bestNode(nodes, deadNodes1, block);
InetSocketAddress targetAddr =
NetUtils.createSocketAddr(chosenNode.getName());
+
+ LOG.info("Datanode available for block: " + chosenNode.getName());
+
return new DNAddrPair(chosenNode, targetAddr);
} catch (IOException ie) {
String blockInfo = block.getBlock() + " file=" + src;
@@ -2003,7 +2076,7 @@
Thread.sleep(3000);
} catch (InterruptedException iex) {
}
- deadNodes.clear(); //2nd option is to remove only nodes[blockId]
+ deadNodes1.clear(); //2nd option is to remove only nodes[blockId]
openInfo();
block = getBlockAt(block.getStartOffset(), false);
failures++;
@@ -2066,14 +2139,16 @@
} else {
LOG.warn("Failed to connect to " + targetAddr +
" for file " + src +
- " for block " + block.getBlock().getBlockId() + ":" +
+ " for block " + block.getBlock().getBlockId() + ": Will add to deadNodes: " +
StringUtils.stringifyException(e));
+
}
} finally {
IOUtils.closeStream(reader);
IOUtils.closeSocket(dn);
}
// Put chosen node into dead list, continue
+ LOG.warn("Adding server to deadNodes, maybe? " + chosenNode.getName());
addToDeadNodes(chosenNode);
}
}
@@ -2192,14 +2267,14 @@
*/
@Override
public synchronized boolean seekToNewSource(long targetPos) throws IOException {
- boolean markedDead = deadNodes.containsKey(currentNode);
+ boolean markedDead = deadNodes1.containsKey(currentNode);
addToDeadNodes(currentNode);
DatanodeInfo oldNode = currentNode;
DatanodeInfo newNode = blockSeekTo(targetPos);
if (!markedDead) {
/* remove it from deadNodes. blockSeekTo could have cleared
* deadNodes and added currentNode again. Thats ok. */
- deadNodes.remove(oldNode);
+ deadNodes1.remove(oldNode);
}
if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
currentNode = newNode;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment