Created
May 20, 2010 20:21
-
-
Save toddlipcon/408035 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
commit 0e0d2cf7cb20847da94bab64d7cab9b0fbf792a0 | |
Author: Todd Lipcon <todd@cloudera.com> | |
Date: Wed May 12 01:57:26 2010 -0700 | |
Convert FSDataset to ReentrantReadWriteLock | |
diff --git src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java | |
index be46138..20bb583 100644 | |
--- src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java | |
+++ src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java | |
@@ -32,6 +32,7 @@ import java.util.List; | |
import java.util.Map; | |
import java.util.Random; | |
import java.util.TreeSet; | |
+import java.util.concurrent.locks.ReentrantReadWriteLock; | |
import javax.management.NotCompliantMBeanException; | |
import javax.management.ObjectName; | |
@@ -409,11 +410,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
} | |
void decDfsUsed(long value) { | |
- // The caller to this method (BlockFileDeleteTask.run()) does | |
- // not have locked FSDataset.this yet. | |
- synchronized(FSDataset.this) { | |
- dfsUsage.decDfsUsed(value); | |
- } | |
+ dfsUsage.decDfsUsed(value); | |
} | |
long getDfsUsed() throws IOException { | |
@@ -802,7 +799,10 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
} | |
/** Return the block file for the given ID */ | |
- public synchronized File findBlockFile(long blockId) { | |
+ public File findBlockFile(long blockId) { | |
+ try { | |
+ lock.readLock().lock(); | |
+ | |
final Block b = new Block(blockId); | |
File blockfile = null; | |
ActiveFile activefile = ongoingCreates.get(b); | |
@@ -819,10 +819,16 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
} | |
} | |
return blockfile; | |
+ } finally { | |
+ lock.readLock().unlock(); | |
+ } | |
} | |
/** {@inheritDoc} */ | |
- public synchronized Block getStoredBlock(long blkid) throws IOException { | |
+ public Block getStoredBlock(long blkid) throws IOException { | |
+ try { | |
+ lock.readLock().lock(); | |
+ | |
File blockfile = findBlockFile(blkid); | |
if (blockfile == null) { | |
return null; | |
@@ -831,6 +837,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
Block block = new Block(blkid); | |
return new Block(blkid, getVisibleLength(block), | |
parseGenerationStamp(blockfile, metafile)); | |
+ } finally { | |
+ lock.readLock().unlock(); | |
+ } | |
} | |
public boolean metaFileExists(Block b) throws IOException { | |
@@ -855,6 +864,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
HashMap<Block,DatanodeBlockInfo> volumeMap = new HashMap<Block, DatanodeBlockInfo>();; | |
static Random random = new Random(); | |
FSDatasetAsyncDiskService asyncDiskService; | |
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); | |
/** | |
* An FSDataset has a directory where it loads its data files. | |
@@ -911,7 +921,10 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
} | |
@Override | |
- public synchronized long getVisibleLength(Block b) throws IOException { | |
+ public long getVisibleLength(Block b) throws IOException { | |
+ try{ | |
+ lock.readLock().lock(); | |
+ | |
ActiveFile activeFile = ongoingCreates.get(b); | |
if (activeFile != null) { | |
@@ -919,11 +932,17 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
} else { | |
return getLength(b); | |
} | |
+ } finally { | |
+ lock.readLock().unlock(); | |
+ } | |
} | |
@Override | |
- public synchronized void setVisibleLength(Block b, long length) | |
+ public void setVisibleLength(Block b, long length) | |
throws IOException { | |
+ try { | |
+ lock.writeLock().lock(); | |
+ | |
ActiveFile activeFile = ongoingCreates.get(b); | |
if (activeFile != null) { | |
@@ -933,12 +952,18 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
String.format("block %s is not being written to", b) | |
); | |
} | |
+ } finally { | |
+ lock.writeLock().unlock(); | |
+ } | |
} | |
/** | |
* Get File name for a given block. | |
*/ | |
- public synchronized File getBlockFile(Block b) throws IOException { | |
+ public File getBlockFile(Block b) throws IOException { | |
+ try { | |
+ lock.readLock().lock(); | |
+ | |
File f = validateBlockFile(b); | |
if(f == null) { | |
if (InterDatanodeProtocol.LOG.isDebugEnabled()) { | |
@@ -947,13 +972,16 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
throw new IOException("Block " + b + " is not valid."); | |
} | |
return f; | |
+ } finally { | |
+ lock.readLock().unlock(); | |
+ } | |
} | |
- public synchronized InputStream getBlockInputStream(Block b) throws IOException { | |
+ public InputStream getBlockInputStream(Block b) throws IOException { | |
return new FileInputStream(getBlockFile(b)); | |
} | |
- public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException { | |
+ public InputStream getBlockInputStream(Block b, long seekOffset) throws IOException { | |
File blockFile = getBlockFile(b); | |
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); | |
@@ -966,18 +994,24 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
/** | |
* Returns handles to the block file and its metadata file | |
*/ | |
- public synchronized BlockInputStreams getTmpInputStreams(Block b, | |
+ public BlockInputStreams getTmpInputStreams(Block b, | |
long blkOffset, long ckoff) throws IOException { | |
+ File blockFile; | |
+ try { | |
+ lock.readLock().lock(); | |
DatanodeBlockInfo info = volumeMap.get(b); | |
if (info == null) { | |
throw new IOException("Block " + b + " does not exist in volumeMap."); | |
} | |
FSVolume v = info.getVolume(); | |
- File blockFile = info.getFile(); | |
+ blockFile = info.getFile(); | |
if (blockFile == null) { | |
blockFile = v.getTmpFile(b); | |
} | |
+ } finally { | |
+ lock.readLock().unlock(); | |
+ } | |
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r"); | |
if (blkOffset > 0) { | |
blockInFile.seek(blkOffset); | |
@@ -1009,8 +1043,12 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
public boolean detachBlock(Block block, int numLinks) throws IOException { | |
DatanodeBlockInfo info = null; | |
- synchronized (this) { | |
+ try { | |
+ lock.readLock().lock(); | |
+ | |
info = volumeMap.get(block); | |
+ } finally { | |
+ lock.readLock().unlock(); | |
} | |
return info.detachBlock(block, numLinks); | |
} | |
@@ -1074,8 +1112,11 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
* | |
* @return ongoing create threads if there is any. Otherwise, return null. | |
*/ | |
- private synchronized List<Thread> tryUpdateBlock( | |
+ private List<Thread> tryUpdateBlock( | |
Block oldblock, Block newblock) throws IOException { | |
+ try { | |
+ lock.writeLock().lock(); | |
+ | |
//check ongoing create threads | |
final ActiveFile activefile = ongoingCreates.get(oldblock); | |
if (activefile != null && !activefile.threads.isEmpty()) { | |
@@ -1138,6 +1179,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
// matches the block file on disk. | |
validateBlockMetadata(newblock); | |
return null; | |
+ } finally { | |
+ lock.writeLock().unlock(); | |
+ } | |
} | |
static void truncateBlock(File blockFile, File metaFile, | |
@@ -1232,7 +1276,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
// | |
File f = null; | |
List<Thread> threads = null; | |
- synchronized (this) { | |
+ try { | |
+ lock.writeLock().lock(); | |
+ | |
// | |
// Is it already in the create process? | |
// | |
@@ -1307,6 +1353,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
volumeMap.put(b, new DatanodeBlockInfo(v, f)); | |
} | |
ongoingCreates.put(b, new ActiveFile(f, threads)); | |
+ } finally { | |
+ lock.writeLock().unlock(); | |
} | |
try { | |
@@ -1361,8 +1409,11 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
file.getChannel().position(ckOffset); | |
} | |
- synchronized File createTmpFile( FSVolume vol, Block blk, | |
+ File createTmpFile( FSVolume vol, Block blk, | |
boolean replicationRequest) throws IOException { | |
+ try { | |
+ lock.writeLock().lock(); | |
+ | |
if ( vol == null ) { | |
vol = volumeMap.get( blk ).getVolume(); | |
if ( vol == null ) { | |
@@ -1370,6 +1421,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
} | |
} | |
return vol.createTmpFile(blk, replicationRequest); | |
+ } finally { | |
+ lock.writeLock().unlock(); | |
+ } | |
} | |
// | |
@@ -1394,8 +1448,11 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
/** | |
* Complete the block write! | |
*/ | |
- private synchronized void finalizeBlockInternal(Block b, boolean reFinalizeOk) | |
+ private void finalizeBlockInternal(Block b, boolean reFinalizeOk) | |
throws IOException { | |
+ try { | |
+ lock.writeLock().lock(); | |
+ | |
ActiveFile activeFile = ongoingCreates.get(b); | |
if (activeFile == null) { | |
if (reFinalizeOk) { | |
@@ -1418,13 +1475,19 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
dest = v.addBlock(b, f); | |
volumeMap.put(b, new DatanodeBlockInfo(v, dest)); | |
ongoingCreates.remove(b); | |
+ } finally { | |
+ lock.writeLock().unlock(); | |
+ } | |
} | |
/** | |
* is this block finalized? Returns true if the block is already | |
* finalized, otherwise returns false. | |
*/ | |
- private synchronized boolean isFinalized(Block b) { | |
+ private boolean isFinalized(Block b) { | |
+ try { | |
+ lock.readLock().lock(); | |
+ | |
FSVolume v = volumeMap.get(b).getVolume(); | |
if (v == null) { | |
DataNode.LOG.warn("No volume for block " + b); | |
@@ -1440,12 +1503,18 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
DataNode.LOG.warn("No temporary file " + f + " for block " + b); | |
} | |
return false; // block is not finalized | |
+ } finally { | |
+ lock.readLock().unlock(); | |
+ } | |
} | |
/** | |
* Remove the temporary block file (if any) | |
*/ | |
- public synchronized void unfinalizeBlock(Block b) throws IOException { | |
+ public void unfinalizeBlock(Block b) throws IOException { | |
+ try { | |
+ lock.writeLock().lock(); | |
+ | |
// remove the block from in-memory data structure | |
ActiveFile activefile = ongoingCreates.remove(b); | |
if (activefile == null) { | |
@@ -1457,6 +1526,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) { | |
DataNode.LOG.warn("Block " + b + " unfinalized and removed. " ); | |
} | |
+ } finally { | |
+ lock.writeLock().unlock(); | |
+ } | |
} | |
/** | |
@@ -1617,7 +1689,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
for (int i = 0; i < invalidBlks.length; i++) { | |
File f = null; | |
FSVolume v; | |
- synchronized (this) { | |
+ try { | |
+ lock.writeLock().lock(); | |
+ | |
f = getFile(invalidBlks[i]); | |
DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]); | |
if (dinfo == null) { | |
@@ -1654,6 +1728,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
} | |
v.clearPath(parent); | |
volumeMap.remove(invalidBlks[i]); | |
+ } finally { | |
+ lock.writeLock().unlock(); | |
} | |
File metaFile = getMetaFile( f, invalidBlks[i] ); | |
long dfsBytes = f.length() + metaFile.length(); | |
@@ -1669,12 +1745,18 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
/** | |
* Turn the block identifier into a filename. | |
*/ | |
- public synchronized File getFile(Block b) { | |
+ public File getFile(Block b) { | |
+ try { | |
+ lock.readLock().lock(); | |
+ | |
DatanodeBlockInfo info = volumeMap.get(b); | |
if (info != null) { | |
return info.getFile(); | |
} | |
return null; | |
+ } finally { | |
+ lock.readLock().unlock(); | |
+ } | |
} | |
/** | |
@@ -1694,7 +1776,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
// else | |
// remove related blocks | |
long mlsec = System.currentTimeMillis(); | |
- synchronized (this) { | |
+ try { | |
+ lock.writeLock().lock(); | |
+ | |
Iterator<Block> ib = volumeMap.keySet().iterator(); | |
while(ib.hasNext()) { | |
Block b = ib.next(); | |
@@ -1711,7 +1795,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface { | |
} | |
} | |
} | |
- } // end of sync | |
+ } finally { | |
+ lock.writeLock().unlock(); | |
+ } | |
mlsec = System.currentTimeMillis() - mlsec; | |
DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks + | |
"(took " + mlsec + " millisecs)"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment