Created
July 4, 2013 22:03
-
-
Save jey/5930490 to your computer and use it in GitHub Desktop.
DFSInputStream.java from CDH 4.3.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.hadoop.hdfs; | |
import java.io.FileInputStream; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import java.net.Socket; | |
import java.nio.ByteBuffer; | |
import java.util.AbstractMap; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.HashSet; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Map.Entry; | |
import java.util.Set; | |
import java.util.concurrent.ConcurrentHashMap; | |
import org.apache.commons.io.IOUtils; | |
import org.apache.hadoop.classification.InterfaceAudience; | |
import org.apache.hadoop.fs.ByteBufferReadable; | |
import org.apache.hadoop.fs.ChecksumException; | |
import org.apache.hadoop.fs.FSInputStream; | |
import org.apache.hadoop.fs.UnresolvedLinkException; | |
import org.apache.hadoop.hdfs.net.DomainPeer; | |
import org.apache.hadoop.hdfs.net.Peer; | |
import org.apache.hadoop.hdfs.net.TcpPeerServer; | |
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; | |
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; | |
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; | |
import org.apache.hadoop.hdfs.protocol.LocatedBlock; | |
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; | |
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; | |
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; | |
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; | |
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; | |
import org.apache.hadoop.ipc.RPC; | |
import org.apache.hadoop.ipc.RemoteException; | |
import org.apache.hadoop.net.NetUtils; | |
import org.apache.hadoop.net.unix.DomainSocket; | |
import org.apache.hadoop.security.token.Token; | |
import com.google.common.annotations.VisibleForTesting; | |
/**************************************************************** | |
* DFSInputStream provides bytes from a named file. It handles | |
* negotiation of the namenode and various datanodes as necessary. | |
****************************************************************/ | |
@InterfaceAudience.Private | |
public class DFSInputStream extends FSInputStream implements ByteBufferReadable { | |
@VisibleForTesting | |
static boolean tcpReadsDisabledForTesting = false; | |
private final PeerCache peerCache; | |
private final DFSClient dfsClient; | |
private boolean closed = false; | |
private final String src; | |
private final long prefetchSize; | |
private BlockReader blockReader = null; | |
private final boolean verifyChecksum; | |
private LocatedBlocks locatedBlocks = null; | |
private long lastBlockBeingWrittenLength = 0; | |
private DatanodeInfo currentNode = null; | |
private LocatedBlock currentLocatedBlock = null; | |
private long pos = 0; | |
private long blockEnd = -1; | |
private final ReadStatistics readStatistics = new ReadStatistics(); | |
public static class ReadStatistics { | |
public ReadStatistics() { | |
this.totalBytesRead = 0; | |
this.totalLocalBytesRead = 0; | |
this.totalShortCircuitBytesRead = 0; | |
} | |
public ReadStatistics(ReadStatistics rhs) { | |
this.totalBytesRead = rhs.getTotalBytesRead(); | |
this.totalLocalBytesRead = rhs.getTotalLocalBytesRead(); | |
this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead(); | |
} | |
/** | |
* @return The total bytes read. This will always be at least as | |
* high as the other numbers, since it includes all of them. | |
*/ | |
public long getTotalBytesRead() { | |
return totalBytesRead; | |
} | |
/** | |
* @return The total local bytes read. This will always be at least | |
* as high as totalShortCircuitBytesRead, since all short-circuit | |
* reads are also local. | |
*/ | |
public long getTotalLocalBytesRead() { | |
return totalLocalBytesRead; | |
} | |
/** | |
* @return The total short-circuit local bytes read. | |
*/ | |
public long getTotalShortCircuitBytesRead() { | |
return totalShortCircuitBytesRead; | |
} | |
/** | |
* @return The total number of bytes read which were not local. | |
*/ | |
public long getRemoteBytesRead() { | |
return totalBytesRead - totalLocalBytesRead; | |
} | |
void addRemoteBytes(long amt) { | |
this.totalBytesRead += amt; | |
} | |
void addLocalBytes(long amt) { | |
this.totalBytesRead += amt; | |
this.totalLocalBytesRead += amt; | |
} | |
void addShortCircuitBytes(long amt) { | |
this.totalBytesRead += amt; | |
this.totalLocalBytesRead += amt; | |
this.totalShortCircuitBytesRead += amt; | |
} | |
private long totalBytesRead; | |
private long totalLocalBytesRead; | |
private long totalShortCircuitBytesRead; | |
} | |
private final FileInputStreamCache fileInputStreamCache; | |
/** | |
* This variable tracks the number of failures since the start of the | |
* most recent user-facing operation. That is to say, it should be reset | |
* whenever the user makes a call on this stream, and if at any point | |
* during the retry logic, the failure count exceeds a threshold, | |
* the errors will be thrown back to the operation. | |
* | |
* Specifically this counts the number of times the client has gone | |
* back to the namenode to get a new list of block locations, and is | |
* capped at maxBlockAcquireFailures | |
*/ | |
private int failures = 0; | |
private final int timeWindow; | |
/* XXX Use of CocurrentHashMap is temp fix. Need to fix | |
* parallel accesses to DFSInputStream (through ptreads) properly */ | |
private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = | |
new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>(); | |
private int buffersize = 1; | |
private final byte[] oneByteBuf = new byte[1]; // used for 'int read()' | |
private final int nCachedConnRetry; | |
void addToDeadNodes(DatanodeInfo dnInfo) { | |
deadNodes.put(dnInfo, dnInfo); | |
} | |
DFSInputStream(DFSClient dfsClient, String src, int buffersize, boolean verifyChecksum | |
) throws IOException, UnresolvedLinkException { | |
this.dfsClient = dfsClient; | |
this.verifyChecksum = verifyChecksum; | |
this.buffersize = buffersize; | |
this.src = src; | |
this.peerCache = dfsClient.peerCache; | |
this.fileInputStreamCache = new FileInputStreamCache( | |
dfsClient.conf.getInt(DFSConfigKeys. | |
DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_KEY, | |
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_SIZE_DEFAULT), | |
dfsClient.conf.getLong(DFSConfigKeys. | |
DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, | |
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_DEFAULT)); | |
prefetchSize = dfsClient.getConf().prefetchSize; | |
timeWindow = dfsClient.getConf().timeWindow; | |
nCachedConnRetry = dfsClient.getConf().nCachedConnRetry; | |
openInfo(); | |
} | |
/** | |
* Grab the open-file info from namenode | |
*/ | |
synchronized void openInfo() throws IOException, UnresolvedLinkException { | |
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); | |
int retriesForLastBlockLength = 3; | |
while (retriesForLastBlockLength > 0) { | |
// Getting last block length as -1 is a special case. When cluster | |
// restarts, DNs may not report immediately. At this time partial block | |
// locations will not be available with NN for getting the length. Lets | |
// retry for 3 times to get the length. | |
if (lastBlockBeingWrittenLength == -1) { | |
DFSClient.LOG.warn("Last block locations not available. " | |
+ "Datanodes might not have reported blocks completely." | |
+ " Will retry for " + retriesForLastBlockLength + " times"); | |
waitFor(4000); | |
lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(); | |
} else { | |
break; | |
} | |
retriesForLastBlockLength--; | |
} | |
if (retriesForLastBlockLength == 0) { | |
throw new IOException("Could not obtain the last block locations."); | |
} | |
} | |
private void waitFor(int waitTime) throws IOException { | |
try { | |
Thread.sleep(waitTime); | |
} catch (InterruptedException e) { | |
throw new IOException( | |
"Interrupted while getting the last block length."); | |
} | |
} | |
private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException { | |
LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0, prefetchSize); | |
if (DFSClient.LOG.isDebugEnabled()) { | |
DFSClient.LOG.debug("newInfo = " + newInfo); | |
} | |
if (newInfo == null) { | |
throw new IOException("Cannot open filename " + src); | |
} | |
if (locatedBlocks != null) { | |
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); | |
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); | |
while (oldIter.hasNext() && newIter.hasNext()) { | |
if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) { | |
throw new IOException("Blocklist for " + src + " has changed!"); | |
} | |
} | |
} | |
locatedBlocks = newInfo; | |
long lastBlockBeingWrittenLength = 0; | |
if (!locatedBlocks.isLastBlockComplete()) { | |
final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); | |
if (last != null) { | |
if (last.getLocations().length == 0) { | |
return -1; | |
} | |
final long len = readBlockLength(last); | |
last.getBlock().setNumBytes(len); | |
lastBlockBeingWrittenLength = len; | |
} | |
} | |
currentNode = null; | |
return lastBlockBeingWrittenLength; | |
} | |
/** Read the block length from one of the datanodes. */ | |
private long readBlockLength(LocatedBlock locatedblock) throws IOException { | |
assert locatedblock != null : "LocatedBlock cannot be null"; | |
int replicaNotFoundCount = locatedblock.getLocations().length; | |
for(DatanodeInfo datanode : locatedblock.getLocations()) { | |
ClientDatanodeProtocol cdp = null; | |
try { | |
cdp = DFSUtil.createClientDatanodeProtocolProxy( | |
datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, | |
dfsClient.getConf().connectToDnViaHostname, locatedblock); | |
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); | |
if (n >= 0) { | |
return n; | |
} | |
} | |
catch(IOException ioe) { | |
if (ioe instanceof RemoteException && | |
(((RemoteException) ioe).unwrapRemoteException() instanceof | |
ReplicaNotFoundException)) { | |
// special case : replica might not be on the DN, treat as 0 length | |
replicaNotFoundCount--; | |
} | |
if (DFSClient.LOG.isDebugEnabled()) { | |
DFSClient.LOG.debug("Failed to getReplicaVisibleLength from datanode " | |
+ datanode + " for block " + locatedblock.getBlock(), ioe); | |
} | |
} finally { | |
if (cdp != null) { | |
RPC.stopProxy(cdp); | |
} | |
} | |
} | |
// Namenode told us about these locations, but none know about the replica | |
// means that we hit the race between pipeline creation start and end. | |
// we require all 3 because some other exception could have happened | |
// on a DN that has it. we want to report that error | |
if (replicaNotFoundCount == 0) { | |
return 0; | |
} | |
throw new IOException("Cannot obtain block length for " + locatedblock); | |
} | |
public synchronized long getFileLength() { | |
return locatedBlocks == null? 0: | |
locatedBlocks.getFileLength() + lastBlockBeingWrittenLength; | |
} | |
// Short circuit local reads are forbidden for files that are | |
// under construction. See HDFS-2757. | |
synchronized boolean shortCircuitForbidden() { | |
return locatedBlocks.isUnderConstruction(); | |
} | |
/** | |
* Returns the datanode from which the stream is currently reading. | |
*/ | |
public DatanodeInfo getCurrentDatanode() { | |
return currentNode; | |
} | |
/** | |
* Returns the block containing the target position. | |
*/ | |
synchronized public ExtendedBlock getCurrentBlock() { | |
if (currentLocatedBlock == null){ | |
return null; | |
} | |
return currentLocatedBlock.getBlock(); | |
} | |
/** | |
* Return collection of blocks that has already been located. | |
*/ | |
public synchronized List<LocatedBlock> getAllBlocks() throws IOException { | |
return getBlockRange(0, getFileLength()); | |
} | |
/** | |
* Get block at the specified position. | |
* Fetch it from the namenode if not cached. | |
* | |
* @param offset | |
* @param updatePosition whether to update current position | |
* @return located block | |
* @throws IOException | |
*/ | |
private synchronized LocatedBlock getBlockAt(long offset, | |
boolean updatePosition) throws IOException { | |
assert (locatedBlocks != null) : "locatedBlocks is null"; | |
final LocatedBlock blk; | |
//check offset | |
if (offset < 0 || offset >= getFileLength()) { | |
throw new IOException("offset < 0 || offset > getFileLength(), offset=" | |
+ offset | |
+ ", updatePosition=" + updatePosition | |
+ ", locatedBlocks=" + locatedBlocks); | |
} | |
else if (offset >= locatedBlocks.getFileLength()) { | |
// offset to the portion of the last block, | |
// which is not known to the name-node yet; | |
// getting the last block | |
blk = locatedBlocks.getLastLocatedBlock(); | |
} | |
else { | |
// search cached blocks first | |
int targetBlockIdx = locatedBlocks.findBlock(offset); | |
if (targetBlockIdx < 0) { // block is not cached | |
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); | |
// fetch more blocks | |
LocatedBlocks newBlocks; | |
newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize); | |
assert (newBlocks != null) : "Could not find target position " + offset; | |
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); | |
} | |
blk = locatedBlocks.get(targetBlockIdx); | |
} | |
// update current position | |
if (updatePosition) { | |
pos = offset; | |
blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1; | |
currentLocatedBlock = blk; | |
} | |
return blk; | |
} | |
/** Fetch a block from namenode and cache it */ | |
private synchronized void fetchBlockAt(long offset) throws IOException { | |
int targetBlockIdx = locatedBlocks.findBlock(offset); | |
if (targetBlockIdx < 0) { // block is not cached | |
targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx); | |
} | |
// fetch blocks | |
LocatedBlocks newBlocks; | |
newBlocks = dfsClient.getLocatedBlocks(src, offset, prefetchSize); | |
if (newBlocks == null) { | |
throw new IOException("Could not find target position " + offset); | |
} | |
locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); | |
} | |
/** | |
* Get blocks in the specified range. | |
* Fetch them from the namenode if not cached. This function | |
* will not get a read request beyond the EOF. | |
* @param offset | |
* @param length | |
* @return consequent segment of located blocks | |
* @throws IOException | |
*/ | |
private synchronized List<LocatedBlock> getBlockRange(long offset, | |
long length) | |
throws IOException { | |
// getFileLength(): returns total file length | |
// locatedBlocks.getFileLength(): returns length of completed blocks | |
if (offset >= getFileLength()) { | |
throw new IOException("Offset: " + offset + | |
" exceeds file length: " + getFileLength()); | |
} | |
final List<LocatedBlock> blocks; | |
final long lengthOfCompleteBlk = locatedBlocks.getFileLength(); | |
final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk; | |
final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk; | |
if (readOffsetWithinCompleteBlk) { | |
//get the blocks of finalized (completed) block range | |
blocks = getFinalizedBlockRange(offset, | |
Math.min(length, lengthOfCompleteBlk - offset)); | |
} else { | |
blocks = new ArrayList<LocatedBlock>(1); | |
} | |
// get the blocks from incomplete block range | |
if (readLengthPastCompleteBlk) { | |
blocks.add(locatedBlocks.getLastLocatedBlock()); | |
} | |
return blocks; | |
} | |
/** | |
* Get blocks in the specified range. | |
* Includes only the complete blocks. | |
* Fetch them from the namenode if not cached. | |
*/ | |
private synchronized List<LocatedBlock> getFinalizedBlockRange( | |
long offset, long length) throws IOException { | |
assert (locatedBlocks != null) : "locatedBlocks is null"; | |
List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>(); | |
// search cached blocks first | |
int blockIdx = locatedBlocks.findBlock(offset); | |
if (blockIdx < 0) { // block is not cached | |
blockIdx = LocatedBlocks.getInsertIndex(blockIdx); | |
} | |
long remaining = length; | |
long curOff = offset; | |
while(remaining > 0) { | |
LocatedBlock blk = null; | |
if(blockIdx < locatedBlocks.locatedBlockCount()) | |
blk = locatedBlocks.get(blockIdx); | |
if (blk == null || curOff < blk.getStartOffset()) { | |
LocatedBlocks newBlocks; | |
newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining); | |
locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks()); | |
continue; | |
} | |
assert curOff >= blk.getStartOffset() : "Block not found"; | |
blockRange.add(blk); | |
long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff; | |
remaining -= bytesRead; | |
curOff += bytesRead; | |
blockIdx++; | |
} | |
return blockRange; | |
} | |
/** | |
* Open a DataInputStream to a DataNode so that it can be read from. | |
* We get block ID and the IDs of the destinations at startup, from the namenode. | |
*/ | |
private synchronized DatanodeInfo blockSeekTo(long target) throws IOException { | |
if (target >= getFileLength()) { | |
throw new IOException("Attempted to read past end of file"); | |
} | |
// Will be getting a new BlockReader. | |
if (blockReader != null) { | |
blockReader.close(); | |
blockReader = null; | |
} | |
// | |
// Connect to best DataNode for desired Block, with potential offset | |
// | |
DatanodeInfo chosenNode = null; | |
int refetchToken = 1; // only need to get a new access token once | |
int refetchEncryptionKey = 1; // only need to get a new encryption key once | |
boolean connectFailedOnce = false; | |
while (true) { | |
// | |
// Compute desired block | |
// | |
LocatedBlock targetBlock = getBlockAt(target, true); | |
assert (target==pos) : "Wrong postion " + pos + " expect " + target; | |
long offsetIntoBlock = target - targetBlock.getStartOffset(); | |
DNAddrPair retval = chooseDataNode(targetBlock); | |
chosenNode = retval.info; | |
InetSocketAddress targetAddr = retval.addr; | |
try { | |
ExtendedBlock blk = targetBlock.getBlock(); | |
Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken(); | |
blockReader = getBlockReader(targetAddr, chosenNode, src, blk, | |
accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock, | |
buffersize, verifyChecksum, dfsClient.clientName); | |
if(connectFailedOnce) { | |
DFSClient.LOG.info("Successfully connected to " + targetAddr + | |
" for " + blk); | |
} | |
return chosenNode; | |
} catch (IOException ex) { | |
if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { | |
DFSClient.LOG.info("Will fetch a new encryption key and retry, " | |
+ "encryption key was invalid when connecting to " + targetAddr | |
+ " : " + ex); | |
// The encryption key used is invalid. | |
refetchEncryptionKey--; | |
dfsClient.clearDataEncryptionKey(); | |
} else if (ex instanceof InvalidBlockTokenException && refetchToken > 0) { | |
DFSClient.LOG.info("Will fetch a new access token and retry, " | |
+ "access token was invalid when connecting to " + targetAddr | |
+ " : " + ex); | |
/* | |
* Get a new access token and retry. Retry is needed in 2 cases. 1) | |
* When both NN and DN re-started while DFSClient holding a cached | |
* access token. 2) In the case that NN fails to update its | |
* access key at pre-set interval (by a wide margin) and | |
* subsequently restarts. In this case, DN re-registers itself with | |
* NN and receives a new access key, but DN will delete the old | |
* access key from its memory since it's considered expired based on | |
* the estimated expiration date. | |
*/ | |
refetchToken--; | |
fetchBlockAt(target); | |
} else { | |
connectFailedOnce = true; | |
DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block" | |
+ ", add to deadNodes and continue. " + ex, ex); | |
// Put chosen node into dead list, continue | |
addToDeadNodes(chosenNode); | |
} | |
} | |
} | |
} | |
/** | |
* Close it down! | |
*/ | |
@Override | |
public synchronized void close() throws IOException { | |
if (closed) { | |
return; | |
} | |
dfsClient.checkOpen(); | |
if (blockReader != null) { | |
blockReader.close(); | |
blockReader = null; | |
} | |
super.close(); | |
fileInputStreamCache.close(); | |
closed = true; | |
} | |
@Override | |
public synchronized int read() throws IOException { | |
int ret = read( oneByteBuf, 0, 1 ); | |
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff); | |
} | |
/** | |
* Wraps different possible read implementations so that readBuffer can be | |
* strategy-agnostic. | |
*/ | |
private interface ReaderStrategy { | |
public int doRead(BlockReader blockReader, int off, int len, | |
ReadStatistics readStatistics) throws ChecksumException, IOException; | |
} | |
private static void updateReadStatistics(ReadStatistics readStatistics, | |
int nRead, BlockReader blockReader) { | |
if (nRead <= 0) return; | |
if (blockReader.isShortCircuit()) { | |
readStatistics.totalBytesRead += nRead; | |
readStatistics.totalLocalBytesRead += nRead; | |
readStatistics.totalShortCircuitBytesRead += nRead; | |
} else if (blockReader.isLocal()) { | |
readStatistics.totalBytesRead += nRead; | |
readStatistics.totalLocalBytesRead += nRead; | |
} else { | |
readStatistics.totalBytesRead += nRead; | |
} | |
} | |
/** | |
* Used to read bytes into a byte[] | |
*/ | |
private static class ByteArrayStrategy implements ReaderStrategy { | |
final byte[] buf; | |
public ByteArrayStrategy(byte[] buf) { | |
this.buf = buf; | |
} | |
@Override | |
public int doRead(BlockReader blockReader, int off, int len, | |
ReadStatistics readStatistics) throws ChecksumException, IOException { | |
int nRead = blockReader.read(buf, off, len); | |
updateReadStatistics(readStatistics, nRead, blockReader); | |
return nRead; | |
} | |
} | |
/** | |
* Used to read bytes into a user-supplied ByteBuffer | |
*/ | |
private static class ByteBufferStrategy implements ReaderStrategy { | |
final ByteBuffer buf; | |
ByteBufferStrategy(ByteBuffer buf) { | |
this.buf = buf; | |
} | |
@Override | |
public int doRead(BlockReader blockReader, int off, int len, | |
ReadStatistics readStatistics) throws ChecksumException, IOException { | |
int oldpos = buf.position(); | |
int oldlimit = buf.limit(); | |
boolean success = false; | |
try { | |
int ret = blockReader.read(buf); | |
success = true; | |
updateReadStatistics(readStatistics, ret, blockReader); | |
return ret; | |
} finally { | |
if (!success) { | |
// Reset to original state so that retries work correctly. | |
buf.position(oldpos); | |
buf.limit(oldlimit); | |
} | |
} | |
} | |
} | |
/* This is a used by regular read() and handles ChecksumExceptions. | |
* name readBuffer() is chosen to imply similarity to readBuffer() in | |
* ChecksumFileSystem | |
*/ | |
private synchronized int readBuffer(ReaderStrategy reader, int off, int len, | |
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) | |
throws IOException { | |
IOException ioe; | |
/* we retry current node only once. So this is set to true only here. | |
* Intention is to handle one common case of an error that is not a | |
* failure on datanode or client : when DataNode closes the connection | |
* since client is idle. If there are other cases of "non-errors" then | |
* then a datanode might be retried by setting this to true again. | |
*/ | |
boolean retryCurrentNode = true; | |
while (true) { | |
// retry as many times as seekToNewSource allows. | |
try { | |
return reader.doRead(blockReader, off, len, readStatistics); | |
} catch ( ChecksumException ce ) { | |
DFSClient.LOG.warn("Found Checksum error for " | |
+ getCurrentBlock() + " from " + currentNode | |
+ " at " + ce.getPos()); | |
ioe = ce; | |
retryCurrentNode = false; | |
// we want to remember which block replicas we have tried | |
addIntoCorruptedBlockMap(getCurrentBlock(), currentNode, | |
corruptedBlockMap); | |
} catch ( IOException e ) { | |
if (!retryCurrentNode) { | |
DFSClient.LOG.warn("Exception while reading from " | |
+ getCurrentBlock() + " of " + src + " from " | |
+ currentNode, e); | |
} | |
ioe = e; | |
} | |
boolean sourceFound = false; | |
if (retryCurrentNode) { | |
/* possibly retry the same node so that transient errors don't | |
* result in application level failures (e.g. Datanode could have | |
* closed the connection because the client is idle for too long). | |
*/ | |
sourceFound = seekToBlockSource(pos); | |
} else { | |
addToDeadNodes(currentNode); | |
sourceFound = seekToNewSource(pos); | |
} | |
if (!sourceFound) { | |
throw ioe; | |
} | |
retryCurrentNode = false; | |
} | |
} | |
private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { | |
dfsClient.checkOpen(); | |
if (closed) { | |
throw new IOException("Stream closed"); | |
} | |
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap | |
= new HashMap<ExtendedBlock, Set<DatanodeInfo>>(); | |
failures = 0; | |
if (pos < getFileLength()) { | |
int retries = 2; | |
while (retries > 0) { | |
try { | |
// currentNode can be left as null if previous read had a checksum | |
// error on the same block. See HDFS-3067 | |
if (pos > blockEnd || currentNode == null) { | |
currentNode = blockSeekTo(pos); | |
} | |
int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); | |
int result = readBuffer(strategy, off, realLen, corruptedBlockMap); | |
if (result >= 0) { | |
pos += result; | |
} else { | |
// got a EOS from reader though we expect more data on it. | |
throw new IOException("Unexpected EOS from the reader"); | |
} | |
if (dfsClient.stats != null && result != -1) { | |
dfsClient.stats.incrementBytesRead(result); | |
} | |
return result; | |
} catch (ChecksumException ce) { | |
throw ce; | |
} catch (IOException e) { | |
if (retries == 1) { | |
DFSClient.LOG.warn("DFS Read", e); | |
} | |
blockEnd = -1; | |
if (currentNode != null) { addToDeadNodes(currentNode); } | |
if (--retries == 0) { | |
throw e; | |
} | |
} finally { | |
// Check if need to report block replicas corruption either read | |
// was successful or ChecksumException occured. | |
reportCheckSumFailure(corruptedBlockMap, | |
currentLocatedBlock.getLocations().length); | |
} | |
} | |
} | |
return -1; | |
} | |
/** | |
* Read the entire buffer. | |
*/ | |
@Override | |
public synchronized int read(final byte buf[], int off, int len) throws IOException { | |
ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); | |
return readWithStrategy(byteArrayReader, off, len); | |
} | |
@Override | |
public synchronized int read(final ByteBuffer buf) throws IOException { | |
ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); | |
return readWithStrategy(byteBufferReader, 0, buf.remaining()); | |
} | |
/** | |
* Add corrupted block replica into map. | |
* @param corruptedBlockMap | |
*/ | |
private void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node, | |
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) { | |
Set<DatanodeInfo> dnSet = null; | |
if((corruptedBlockMap.containsKey(blk))) { | |
dnSet = corruptedBlockMap.get(blk); | |
}else { | |
dnSet = new HashSet<DatanodeInfo>(); | |
} | |
if (!dnSet.contains(node)) { | |
dnSet.add(node); | |
corruptedBlockMap.put(blk, dnSet); | |
} | |
} | |
private DNAddrPair chooseDataNode(LocatedBlock block) | |
throws IOException { | |
while (true) { | |
DatanodeInfo[] nodes = block.getLocations(); | |
try { | |
DatanodeInfo chosenNode = bestNode(nodes, deadNodes); | |
final String dnAddr = | |
chosenNode.getXferAddr(dfsClient.connectToDnViaHostname()); | |
if (DFSClient.LOG.isDebugEnabled()) { | |
DFSClient.LOG.debug("Connecting to datanode " + dnAddr); | |
} | |
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); | |
return new DNAddrPair(chosenNode, targetAddr); | |
} catch (IOException ie) { | |
String blockInfo = block.getBlock() + " file=" + src; | |
if (failures >= dfsClient.getMaxBlockAcquireFailures()) { | |
throw new BlockMissingException(src, "Could not obtain block: " + blockInfo, | |
block.getStartOffset()); | |
} | |
if (nodes == null || nodes.length == 0) { | |
DFSClient.LOG.info("No node available for " + blockInfo); | |
} | |
DFSClient.LOG.info("Could not obtain " + block.getBlock() | |
+ " from any node: " + ie | |
+ ". Will get new block locations from namenode and retry..."); | |
try { | |
// Introducing a random factor to the wait time before another retry. | |
// The wait time is dependent on # of failures and a random factor. | |
// At the first time of getting a BlockMissingException, the wait time | |
// is a random number between 0..3000 ms. If the first retry | |
// still fails, we will wait 3000 ms grace period before the 2nd retry. | |
// Also at the second retry, the waiting window is expanded to 6000 ms | |
// alleviating the request rate from the server. Similarly the 3rd retry | |
// will wait 6000ms grace period before retry and the waiting window is | |
// expanded to 9000ms. | |
double waitTime = timeWindow * failures + // grace period for the last round of attempt | |
timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure | |
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec."); | |
Thread.sleep((long)waitTime); | |
} catch (InterruptedException iex) { | |
} | |
deadNodes.clear(); //2nd option is to remove only nodes[blockId] | |
openInfo(); | |
block = getBlockAt(block.getStartOffset(), false); | |
failures++; | |
continue; | |
} | |
} | |
} | |
private void fetchBlockByteRange(LocatedBlock block, long start, long end, | |
byte[] buf, int offset, | |
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap) | |
throws IOException { | |
// | |
// Connect to best DataNode for desired Block, with potential offset | |
// | |
int refetchToken = 1; // only need to get a new access token once | |
int refetchEncryptionKey = 1; // only need to get a new encryption key once | |
while (true) { | |
// cached block locations may have been updated by chooseDataNode() | |
// or fetchBlockAt(). Always get the latest list of locations at the | |
// start of the loop. | |
block = getBlockAt(block.getStartOffset(), false); | |
DNAddrPair retval = chooseDataNode(block); | |
DatanodeInfo chosenNode = retval.info; | |
InetSocketAddress targetAddr = retval.addr; | |
BlockReader reader = null; | |
try { | |
Token<BlockTokenIdentifier> blockToken = block.getBlockToken(); | |
int len = (int) (end - start + 1); | |
reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(), | |
blockToken, start, len, buffersize, verifyChecksum, | |
dfsClient.clientName); | |
int nread = reader.readAll(buf, offset, len); | |
if (nread != len) { | |
throw new IOException("truncated return from reader.read(): " + | |
"excpected " + len + ", got " + nread); | |
} | |
return; | |
} catch (ChecksumException e) { | |
DFSClient.LOG.warn("fetchBlockByteRange(). Got a checksum exception for " + | |
src + " at " + block.getBlock() + ":" + | |
e.getPos() + " from " + chosenNode); | |
// we want to remember what we have tried | |
addIntoCorruptedBlockMap(block.getBlock(), chosenNode, corruptedBlockMap); | |
} catch (IOException e) { | |
if (e instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { | |
DFSClient.LOG.info("Will fetch a new encryption key and retry, " | |
+ "encryption key was invalid when connecting to " + targetAddr | |
+ " : " + e); | |
// The encryption key used is invalid. | |
refetchEncryptionKey--; | |
dfsClient.clearDataEncryptionKey(); | |
} else if (e instanceof InvalidBlockTokenException && refetchToken > 0) { | |
DFSClient.LOG.info("Will get a new access token and retry, " | |
+ "access token was invalid when connecting to " + targetAddr | |
+ " : " + e); | |
refetchToken--; | |
fetchBlockAt(block.getStartOffset()); | |
continue; | |
} else { | |
DFSClient.LOG.warn("Failed to connect to " + targetAddr + | |
" for file " + src + " for block " + block.getBlock() + ":" + e); | |
if (DFSClient.LOG.isDebugEnabled()) { | |
DFSClient.LOG.debug("Connection failure ", e); | |
} | |
} | |
} finally { | |
if (reader != null) { | |
reader.close(); | |
} | |
} | |
// Put chosen node into dead list, continue | |
addToDeadNodes(chosenNode); | |
} | |
} | |
private Peer newTcpPeer(InetSocketAddress addr) throws IOException { | |
Peer peer = null; | |
boolean success = false; | |
Socket sock = null; | |
try { | |
sock = dfsClient.socketFactory.createSocket(); | |
NetUtils.connect(sock, addr, | |
dfsClient.getRandomLocalInterfaceAddr(), | |
dfsClient.getConf().socketTimeout); | |
peer = TcpPeerServer.peerFromSocketAndKey(sock, | |
dfsClient.getDataEncryptionKey()); | |
success = true; | |
return peer; | |
} finally { | |
if (!success) { | |
IOUtils.closeQuietly(peer); | |
IOUtils.closeQuietly(sock); | |
} | |
} | |
} | |
/** | |
* Retrieve a BlockReader suitable for reading. | |
* This method will reuse the cached connection to the DN if appropriate. | |
* Otherwise, it will create a new connection. | |
* Throwing an IOException from this method is basically equivalent to | |
* declaring the DataNode bad, so we try to connect a lot of different ways | |
* before doing that. | |
* | |
* @param dnAddr Address of the datanode | |
* @param chosenNode Chosen datanode information | |
* @param file File location | |
* @param block The Block object | |
* @param blockToken The access token for security | |
* @param startOffset The read offset, relative to block head | |
* @param len The number of bytes to read | |
* @param bufferSize The IO buffer size (not the client buffer size) | |
* @param verifyChecksum Whether to verify checksum | |
* @param clientName Client name | |
* @return New BlockReader instance | |
*/ | |
protected BlockReader getBlockReader(InetSocketAddress dnAddr, | |
DatanodeInfo chosenNode, | |
String file, | |
ExtendedBlock block, | |
Token<BlockTokenIdentifier> blockToken, | |
long startOffset, | |
long len, | |
int bufferSize, | |
boolean verifyChecksum, | |
String clientName) | |
throws IOException { | |
// Firstly, we check to see if we have cached any file descriptors for | |
// local blocks. If so, we can just re-use those file descriptors. | |
FileInputStream fis[] = fileInputStreamCache.get(chosenNode, block); | |
if (fis != null) { | |
if (DFSClient.LOG.isDebugEnabled()) { | |
DFSClient.LOG.debug("got FileInputStreams for " + block + " from " + | |
"the FileInputStreamCache."); | |
} | |
return new BlockReaderLocal(dfsClient.conf, file, | |
block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum, | |
fileInputStreamCache); | |
} | |
// If the legacy local block reader is enabled and we are reading a local | |
// block, try to create a BlockReaderLocalLegacy. The legacy local block | |
// reader implements local reads in the style first introduced by HDFS-2246. | |
if ((dfsClient.useLegacyBlockReaderLocal()) && | |
DFSClient.isLocalAddress(dnAddr) && | |
(!shortCircuitForbidden())) { | |
try { | |
return BlockReaderFactory.getLegacyBlockReaderLocal(dfsClient.ugi, | |
dfsClient.conf, clientName, block, blockToken, chosenNode, | |
dfsClient.hdfsTimeout, startOffset,dfsClient.connectToDnViaHostname()); | |
} catch (IOException e) { | |
DFSClient.LOG.warn("error creating legacy BlockReaderLocal. " + | |
"Disabling legacy local reads.", e); | |
dfsClient.disableLegacyBlockReaderLocal(); | |
} | |
} | |
// Look for cached domain peers. | |
int cacheTries = 0; | |
DomainSocketFactory dsFactory = dfsClient.getDomainSocketFactory(); | |
BlockReader reader = null; | |
for (; cacheTries < nCachedConnRetry; ++cacheTries) { | |
Peer peer = peerCache.get(chosenNode, true); | |
if (peer == null) break; | |
try { | |
boolean allowShortCircuitLocalReads = dfsClient.getConf(). | |
shortCircuitLocalReads && (!shortCircuitForbidden()); | |
reader = BlockReaderFactory.newBlockReader( | |
dfsClient.conf, file, block, blockToken, startOffset, | |
len, verifyChecksum, clientName, peer, chosenNode, | |
dsFactory, peerCache, fileInputStreamCache, | |
allowShortCircuitLocalReads); | |
return reader; | |
} catch (IOException ex) { | |
DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " + | |
"Closing stale " + peer, ex); | |
} finally { | |
if (reader == null) { | |
IOUtils.closeQuietly(peer); | |
} | |
} | |
} | |
// Try to create a DomainPeer. | |
DomainSocket domSock = dsFactory.create(dnAddr, this); | |
if (domSock != null) { | |
Peer peer = new DomainPeer(domSock); | |
try { | |
boolean allowShortCircuitLocalReads = dfsClient.getConf(). | |
shortCircuitLocalReads && (!shortCircuitForbidden()); | |
reader = BlockReaderFactory.newBlockReader( | |
dfsClient.conf, file, block, blockToken, startOffset, | |
len, verifyChecksum, clientName, peer, chosenNode, | |
dsFactory, peerCache, fileInputStreamCache, | |
allowShortCircuitLocalReads); | |
return reader; | |
} catch (IOException e) { | |
DFSClient.LOG.warn("failed to connect to " + domSock, e); | |
} finally { | |
if (reader == null) { | |
// If the Peer that we got the error from was a DomainPeer, | |
// mark the socket path as bad, so that newDataSocket will not try | |
// to re-open this socket for a while. | |
dsFactory.disableDomainSocketPath(domSock.getPath()); | |
IOUtils.closeQuietly(peer); | |
} | |
} | |
} | |
// Look for cached peers. | |
for (; cacheTries < nCachedConnRetry; ++cacheTries) { | |
Peer peer = peerCache.get(chosenNode, false); | |
if (peer == null) break; | |
try { | |
reader = BlockReaderFactory.newBlockReader( | |
dfsClient.conf, file, block, blockToken, startOffset, | |
len, verifyChecksum, clientName, peer, chosenNode, | |
dsFactory, peerCache, fileInputStreamCache, false); | |
return reader; | |
} catch (IOException ex) { | |
DFSClient.LOG.debug("Error making BlockReader. Closing stale " + | |
peer, ex); | |
} finally { | |
if (reader == null) { | |
IOUtils.closeQuietly(peer); | |
} | |
} | |
} | |
if (tcpReadsDisabledForTesting) { | |
throw new IOException("TCP reads are disabled."); | |
} | |
// Try to create a new remote peer. | |
Peer peer = newTcpPeer(dnAddr); | |
return BlockReaderFactory.newBlockReader( | |
dfsClient.conf, file, block, blockToken, startOffset, | |
len, verifyChecksum, clientName, peer, chosenNode, | |
dsFactory, peerCache, fileInputStreamCache, false); | |
} | |
/** | |
* Read bytes starting from the specified position. | |
* | |
* @param position start read from this position | |
* @param buffer read buffer | |
* @param offset offset into buffer | |
* @param length number of bytes to read | |
* | |
* @return actual number of bytes read | |
*/ | |
@Override | |
public int read(long position, byte[] buffer, int offset, int length) | |
throws IOException { | |
// sanity checks | |
dfsClient.checkOpen(); | |
if (closed) { | |
throw new IOException("Stream closed"); | |
} | |
failures = 0; | |
long filelen = getFileLength(); | |
if ((position < 0) || (position >= filelen)) { | |
return -1; | |
} | |
int realLen = length; | |
if ((position + length) > filelen) { | |
realLen = (int)(filelen - position); | |
} | |
// determine the block and byte range within the block | |
// corresponding to position and realLen | |
List<LocatedBlock> blockRange = getBlockRange(position, realLen); | |
int remaining = realLen; | |
Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap | |
= new HashMap<ExtendedBlock, Set<DatanodeInfo>>(); | |
for (LocatedBlock blk : blockRange) { | |
long targetStart = position - blk.getStartOffset(); | |
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart); | |
try { | |
fetchBlockByteRange(blk, targetStart, | |
targetStart + bytesToRead - 1, buffer, offset, corruptedBlockMap); | |
} finally { | |
// Check and report if any block replicas are corrupted. | |
// BlockMissingException may be caught if all block replicas are | |
// corrupted. | |
reportCheckSumFailure(corruptedBlockMap, blk.getLocations().length); | |
} | |
remaining -= bytesToRead; | |
position += bytesToRead; | |
offset += bytesToRead; | |
} | |
assert remaining == 0 : "Wrong number of bytes read."; | |
if (dfsClient.stats != null) { | |
dfsClient.stats.incrementBytesRead(realLen); | |
} | |
return realLen; | |
} | |
/** | |
* DFSInputStream reports checksum failure. | |
* Case I : client has tried multiple data nodes and at least one of the | |
* attempts has succeeded. We report the other failures as corrupted block to | |
* namenode. | |
* Case II: client has tried out all data nodes, but all failed. We | |
* only report if the total number of replica is 1. We do not | |
* report otherwise since this maybe due to the client is a handicapped client | |
* (who can not read). | |
* @param corruptedBlockMap, map of corrupted blocks | |
* @param dataNodeCount, number of data nodes who contains the block replicas | |
*/ | |
private void reportCheckSumFailure( | |
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, | |
int dataNodeCount) { | |
if (corruptedBlockMap.isEmpty()) { | |
return; | |
} | |
Iterator<Entry<ExtendedBlock, Set<DatanodeInfo>>> it = corruptedBlockMap | |
.entrySet().iterator(); | |
Entry<ExtendedBlock, Set<DatanodeInfo>> entry = it.next(); | |
ExtendedBlock blk = entry.getKey(); | |
Set<DatanodeInfo> dnSet = entry.getValue(); | |
if (((dnSet.size() < dataNodeCount) && (dnSet.size() > 0)) | |
|| ((dataNodeCount == 1) && (dnSet.size() == dataNodeCount))) { | |
DatanodeInfo[] locs = new DatanodeInfo[dnSet.size()]; | |
int i = 0; | |
for (DatanodeInfo dn:dnSet) { | |
locs[i++] = dn; | |
} | |
LocatedBlock [] lblocks = { new LocatedBlock(blk, locs) }; | |
dfsClient.reportChecksumFailure(src, lblocks); | |
} | |
corruptedBlockMap.clear(); | |
} | |
@Override | |
public long skip(long n) throws IOException { | |
if ( n > 0 ) { | |
long curPos = getPos(); | |
long fileLen = getFileLength(); | |
if( n+curPos > fileLen ) { | |
n = fileLen - curPos; | |
} | |
seek(curPos+n); | |
return n; | |
} | |
return n < 0 ? -1 : 0; | |
} | |
/** | |
* Seek to a new arbitrary location | |
*/ | |
@Override | |
public synchronized void seek(long targetPos) throws IOException { | |
if (targetPos > getFileLength()) { | |
throw new IOException("Cannot seek after EOF"); | |
} | |
if (closed) { | |
throw new IOException("Stream is closed!"); | |
} | |
boolean done = false; | |
if (pos <= targetPos && targetPos <= blockEnd) { | |
// | |
// If this seek is to a positive position in the current | |
// block, and this piece of data might already be lying in | |
// the TCP buffer, then just eat up the intervening data. | |
// | |
int diff = (int)(targetPos - pos); | |
if (diff <= blockReader.available()) { | |
try { | |
pos += blockReader.skip(diff); | |
if (pos == targetPos) { | |
done = true; | |
} | |
} catch (IOException e) {//make following read to retry | |
if(DFSClient.LOG.isDebugEnabled()) { | |
DFSClient.LOG.debug("Exception while seek to " + targetPos | |
+ " from " + getCurrentBlock() + " of " + src + " from " | |
+ currentNode, e); | |
} | |
} | |
} | |
} | |
if (!done) { | |
pos = targetPos; | |
blockEnd = -1; | |
} | |
} | |
/** | |
* Same as {@link #seekToNewSource(long)} except that it does not exclude | |
* the current datanode and might connect to the same node. | |
*/ | |
private synchronized boolean seekToBlockSource(long targetPos) | |
throws IOException { | |
currentNode = blockSeekTo(targetPos); | |
return true; | |
} | |
/** | |
* Seek to given position on a node other than the current node. If | |
* a node other than the current node is found, then returns true. | |
* If another node could not be found, then returns false. | |
*/ | |
@Override | |
public synchronized boolean seekToNewSource(long targetPos) throws IOException { | |
boolean markedDead = deadNodes.containsKey(currentNode); | |
addToDeadNodes(currentNode); | |
DatanodeInfo oldNode = currentNode; | |
DatanodeInfo newNode = blockSeekTo(targetPos); | |
if (!markedDead) { | |
/* remove it from deadNodes. blockSeekTo could have cleared | |
* deadNodes and added currentNode again. Thats ok. */ | |
deadNodes.remove(oldNode); | |
} | |
if (!oldNode.getStorageID().equals(newNode.getStorageID())) { | |
currentNode = newNode; | |
return true; | |
} else { | |
return false; | |
} | |
} | |
/** | |
*/ | |
@Override | |
public synchronized long getPos() throws IOException { | |
return pos; | |
} | |
/** Return the size of the remaining available bytes | |
* if the size is less than or equal to {@link Integer#MAX_VALUE}, | |
* otherwise, return {@link Integer#MAX_VALUE}. | |
*/ | |
@Override | |
public synchronized int available() throws IOException { | |
if (closed) { | |
throw new IOException("Stream closed"); | |
} | |
final long remaining = getFileLength() - pos; | |
return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE; | |
} | |
/** | |
* We definitely don't support marks | |
*/ | |
@Override | |
public boolean markSupported() { | |
return false; | |
} | |
@Override | |
public void mark(int readLimit) { | |
} | |
@Override | |
public void reset() throws IOException { | |
throw new IOException("Mark/reset not supported"); | |
} | |
/** | |
* Pick the best node from which to stream the data. | |
* Entries in <i>nodes</i> are already in the priority order | |
*/ | |
static DatanodeInfo bestNode(DatanodeInfo nodes[], | |
AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes) | |
throws IOException { | |
if (nodes != null) { | |
for (int i = 0; i < nodes.length; i++) { | |
if (!deadNodes.containsKey(nodes[i])) { | |
return nodes[i]; | |
} | |
} | |
} | |
throw new IOException("No live nodes contain current block"); | |
} | |
/** Utility class to encapsulate data node info and its address. */ | |
static class DNAddrPair { | |
DatanodeInfo info; | |
InetSocketAddress addr; | |
DNAddrPair(DatanodeInfo info, InetSocketAddress addr) { | |
this.info = info; | |
this.addr = addr; | |
} | |
} | |
/** | |
* Get statistics about the reads which this DFSInputStream has done. | |
*/ | |
public synchronized ReadStatistics getReadStatistics() { | |
return new ReadStatistics(readStatistics); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment