Skip to content

Instantly share code, notes, and snippets.

@toddlipcon
Created May 20, 2010 20:21
Show Gist options
  • Save toddlipcon/408035 to your computer and use it in GitHub Desktop.
Save toddlipcon/408035 to your computer and use it in GitHub Desktop.
commit 0e0d2cf7cb20847da94bab64d7cab9b0fbf792a0
Author: Todd Lipcon <todd@cloudera.com>
Date: Wed May 12 01:57:26 2010 -0700
Convert FSDataset to ReentrantReadWriteLock
diff --git src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
index be46138..20bb583 100644
--- src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
+++ src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeSet;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -409,11 +410,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
}
void decDfsUsed(long value) {
- // The caller to this method (BlockFileDeleteTask.run()) does
- // not have locked FSDataset.this yet.
- synchronized(FSDataset.this) {
- dfsUsage.decDfsUsed(value);
- }
+ dfsUsage.decDfsUsed(value);
}
long getDfsUsed() throws IOException {
@@ -802,7 +799,10 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
}
/** Return the block file for the given ID */
- public synchronized File findBlockFile(long blockId) {
+ public File findBlockFile(long blockId) {
+ try {
+ lock.readLock().lock();
+
final Block b = new Block(blockId);
File blockfile = null;
ActiveFile activefile = ongoingCreates.get(b);
@@ -819,10 +819,16 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
}
}
return blockfile;
+ } finally {
+ lock.readLock().unlock();
+ }
}
/** {@inheritDoc} */
- public synchronized Block getStoredBlock(long blkid) throws IOException {
+ public Block getStoredBlock(long blkid) throws IOException {
+ try {
+ lock.readLock().lock();
+
File blockfile = findBlockFile(blkid);
if (blockfile == null) {
return null;
@@ -831,6 +837,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
Block block = new Block(blkid);
return new Block(blkid, getVisibleLength(block),
parseGenerationStamp(blockfile, metafile));
+ } finally {
+ lock.readLock().unlock();
+ }
}
public boolean metaFileExists(Block b) throws IOException {
@@ -855,6 +864,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
HashMap<Block,DatanodeBlockInfo> volumeMap = new HashMap<Block, DatanodeBlockInfo>();;
static Random random = new Random();
FSDatasetAsyncDiskService asyncDiskService;
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/**
* An FSDataset has a directory where it loads its data files.
@@ -911,7 +921,10 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
}
@Override
- public synchronized long getVisibleLength(Block b) throws IOException {
+ public long getVisibleLength(Block b) throws IOException {
+ try{
+ lock.readLock().lock();
+
ActiveFile activeFile = ongoingCreates.get(b);
if (activeFile != null) {
@@ -919,11 +932,17 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
} else {
return getLength(b);
}
+ } finally {
+ lock.readLock().unlock();
+ }
}
@Override
- public synchronized void setVisibleLength(Block b, long length)
+ public void setVisibleLength(Block b, long length)
throws IOException {
+ try {
+ lock.writeLock().lock();
+
ActiveFile activeFile = ongoingCreates.get(b);
if (activeFile != null) {
@@ -933,12 +952,18 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
String.format("block %s is not being written to", b)
);
}
+ } finally {
+ lock.writeLock().unlock();
+ }
}
/**
* Get File name for a given block.
*/
- public synchronized File getBlockFile(Block b) throws IOException {
+ public File getBlockFile(Block b) throws IOException {
+ try {
+ lock.readLock().lock();
+
File f = validateBlockFile(b);
if(f == null) {
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
@@ -947,13 +972,16 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
throw new IOException("Block " + b + " is not valid.");
}
return f;
+ } finally {
+ lock.readLock().unlock();
+ }
}
- public synchronized InputStream getBlockInputStream(Block b) throws IOException {
+ public InputStream getBlockInputStream(Block b) throws IOException {
return new FileInputStream(getBlockFile(b));
}
- public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
+ public InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
File blockFile = getBlockFile(b);
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
@@ -966,18 +994,24 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
/**
* Returns handles to the block file and its metadata file
*/
- public synchronized BlockInputStreams getTmpInputStreams(Block b,
+ public BlockInputStreams getTmpInputStreams(Block b,
long blkOffset, long ckoff) throws IOException {
+ File blockFile;
+ try {
+ lock.readLock().lock();
DatanodeBlockInfo info = volumeMap.get(b);
if (info == null) {
throw new IOException("Block " + b + " does not exist in volumeMap.");
}
FSVolume v = info.getVolume();
- File blockFile = info.getFile();
+ blockFile = info.getFile();
if (blockFile == null) {
blockFile = v.getTmpFile(b);
}
+ } finally {
+ lock.readLock().unlock();
+ }
RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
if (blkOffset > 0) {
blockInFile.seek(blkOffset);
@@ -1009,8 +1043,12 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
public boolean detachBlock(Block block, int numLinks) throws IOException {
DatanodeBlockInfo info = null;
- synchronized (this) {
+ try {
+ lock.readLock().lock();
+
info = volumeMap.get(block);
+ } finally {
+ lock.readLock().unlock();
}
return info.detachBlock(block, numLinks);
}
@@ -1074,8 +1112,11 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
*
* @return ongoing create threads if there is any. Otherwise, return null.
*/
- private synchronized List<Thread> tryUpdateBlock(
+ private List<Thread> tryUpdateBlock(
Block oldblock, Block newblock) throws IOException {
+ try {
+ lock.writeLock().lock();
+
//check ongoing create threads
final ActiveFile activefile = ongoingCreates.get(oldblock);
if (activefile != null && !activefile.threads.isEmpty()) {
@@ -1138,6 +1179,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
// matches the block file on disk.
validateBlockMetadata(newblock);
return null;
+ } finally {
+ lock.writeLock().unlock();
+ }
}
static void truncateBlock(File blockFile, File metaFile,
@@ -1232,7 +1276,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
//
File f = null;
List<Thread> threads = null;
- synchronized (this) {
+ try {
+ lock.writeLock().lock();
+
//
// Is it already in the create process?
//
@@ -1307,6 +1353,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
volumeMap.put(b, new DatanodeBlockInfo(v, f));
}
ongoingCreates.put(b, new ActiveFile(f, threads));
+ } finally {
+ lock.writeLock().unlock();
}
try {
@@ -1361,8 +1409,11 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
file.getChannel().position(ckOffset);
}
- synchronized File createTmpFile( FSVolume vol, Block blk,
+ File createTmpFile( FSVolume vol, Block blk,
boolean replicationRequest) throws IOException {
+ try {
+ lock.writeLock().lock();
+
if ( vol == null ) {
vol = volumeMap.get( blk ).getVolume();
if ( vol == null ) {
@@ -1370,6 +1421,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
}
}
return vol.createTmpFile(blk, replicationRequest);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
//
@@ -1394,8 +1448,11 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
/**
* Complete the block write!
*/
- private synchronized void finalizeBlockInternal(Block b, boolean reFinalizeOk)
+ private void finalizeBlockInternal(Block b, boolean reFinalizeOk)
throws IOException {
+ try {
+ lock.writeLock().lock();
+
ActiveFile activeFile = ongoingCreates.get(b);
if (activeFile == null) {
if (reFinalizeOk) {
@@ -1418,13 +1475,19 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
dest = v.addBlock(b, f);
volumeMap.put(b, new DatanodeBlockInfo(v, dest));
ongoingCreates.remove(b);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
/**
* is this block finalized? Returns true if the block is already
* finalized, otherwise returns false.
*/
- private synchronized boolean isFinalized(Block b) {
+ private boolean isFinalized(Block b) {
+ try {
+ lock.readLock().lock();
+
FSVolume v = volumeMap.get(b).getVolume();
if (v == null) {
DataNode.LOG.warn("No volume for block " + b);
@@ -1440,12 +1503,18 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
DataNode.LOG.warn("No temporary file " + f + " for block " + b);
}
return false; // block is not finalized
+ } finally {
+ lock.readLock().unlock();
+ }
}
/**
* Remove the temporary block file (if any)
*/
- public synchronized void unfinalizeBlock(Block b) throws IOException {
+ public void unfinalizeBlock(Block b) throws IOException {
+ try {
+ lock.writeLock().lock();
+
// remove the block from in-memory data structure
ActiveFile activefile = ongoingCreates.remove(b);
if (activefile == null) {
@@ -1457,6 +1526,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
}
+ } finally {
+ lock.writeLock().unlock();
+ }
}
/**
@@ -1617,7 +1689,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
for (int i = 0; i < invalidBlks.length; i++) {
File f = null;
FSVolume v;
- synchronized (this) {
+ try {
+ lock.writeLock().lock();
+
f = getFile(invalidBlks[i]);
DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]);
if (dinfo == null) {
@@ -1654,6 +1728,8 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
}
v.clearPath(parent);
volumeMap.remove(invalidBlks[i]);
+ } finally {
+ lock.writeLock().unlock();
}
File metaFile = getMetaFile( f, invalidBlks[i] );
long dfsBytes = f.length() + metaFile.length();
@@ -1669,12 +1745,18 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
/**
* Turn the block identifier into a filename.
*/
- public synchronized File getFile(Block b) {
+ public File getFile(Block b) {
+ try {
+ lock.readLock().lock();
+
DatanodeBlockInfo info = volumeMap.get(b);
if (info != null) {
return info.getFile();
}
return null;
+ } finally {
+ lock.readLock().unlock();
+ }
}
/**
@@ -1694,7 +1776,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
// else
// remove related blocks
long mlsec = System.currentTimeMillis();
- synchronized (this) {
+ try {
+ lock.writeLock().lock();
+
Iterator<Block> ib = volumeMap.keySet().iterator();
while(ib.hasNext()) {
Block b = ib.next();
@@ -1711,7 +1795,9 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
}
}
}
- } // end of sync
+ } finally {
+ lock.writeLock().unlock();
+ }
mlsec = System.currentTimeMillis() - mlsec;
DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
"(took " + mlsec + " millisecs)");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment