Created
April 8, 2019 03:41
-
-
Save tanghaodong25/00103ce9515b9245a6f40f9f73ba4856 to your computer and use it in GitHub Desktop.
spark shuffle read block time log
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | |
index 94def4d31c..4f45e2d454 100644 | |
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | |
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | |
@@ -100,7 +100,7 @@ final class ShuffleBlockFetcherIterator( | |
* A queue to hold our results. This turns the asynchronous model provided by | |
* [[org.apache.spark.network.BlockTransferService]] into a synchronous model (iterator). | |
*/ | |
- private[this] val results = new LinkedBlockingQueue[FetchResult] | |
+ private[this] val results = new LinkedBlockingQueue[(FetchResult, Long, Long)] | |
/** | |
* Current [[FetchResult]] being processed. We track this so we can release the current buffer | |
@@ -191,7 +191,8 @@ final class ShuffleBlockFetcherIterator( | |
// Release buffers in the results queue | |
val iter = results.iterator() | |
while (iter.hasNext) { | |
- val result = iter.next() | |
+ val tuple = iter.next() | |
+ val result = tuple._1 | |
result match { | |
case SuccessFetchResult(_, address, _, buf, _) => | |
if (address != blockManager.blockManagerId) { | |
@@ -223,6 +224,7 @@ final class ShuffleBlockFetcherIterator( | |
val remainingBlocks = new HashSet[String]() ++= sizeMap.keys | |
val blockIds = req.blocks.map(_._1.toString) | |
val address = req.address | |
+ val sendRequesTime = System.currentTimeMillis() | |
val blockFetchingListener = new BlockFetchingListener { | |
override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = { | |
@@ -232,10 +234,11 @@ final class ShuffleBlockFetcherIterator( | |
if (!isZombie) { | |
// Increment the ref count because we need to pass this to a different thread. | |
// This needs to be released after use. | |
+ val getRequestTime = System.currentTimeMillis() | |
buf.retain() | |
remainingBlocks -= blockId | |
- results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, | |
- remainingBlocks.isEmpty)) | |
+ results.put((new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, | |
+ remainingBlocks.isEmpty), sendRequesTime, getRequestTime)) | |
logDebug("remainingBlocks: " + remainingBlocks) | |
} | |
} | |
@@ -244,7 +247,7 @@ final class ShuffleBlockFetcherIterator( | |
override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { | |
logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) | |
- results.put(new FailureFetchResult(BlockId(blockId), address, e)) | |
+ results.put((new FailureFetchResult(BlockId(blockId), address, e), 0, 0)) | |
} | |
} | |
@@ -326,17 +329,19 @@ final class ShuffleBlockFetcherIterator( | |
while (iter.hasNext) { | |
val blockId = iter.next() | |
try { | |
+ val startGetBlockTime = System.currentTimeMillis() | |
val buf = blockManager.getBlockData(blockId) | |
shuffleMetrics.incLocalBlocksFetched(1) | |
shuffleMetrics.incLocalBytesRead(buf.size) | |
buf.retain() | |
- results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId, | |
- buf.size(), buf, false)) | |
+ val endGetBlockTime = System.currentTimeMillis() | |
+ results.put((new SuccessFetchResult(blockId, blockManager.blockManagerId, | |
+ buf.size(), buf, false), startGetBlockTime, endGetBlockTime)) | |
} catch { | |
case e: Exception => | |
// If we see an exception, stop immediately. | |
logError(s"Error occurred while fetching local blocks", e) | |
- results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e)) | |
+ results.put((new FailureFetchResult(blockId, blockManager.blockManagerId, e), 0, 0)) | |
return | |
} | |
} | |
@@ -390,19 +395,27 @@ final class ShuffleBlockFetcherIterator( | |
// For local shuffle block, throw FailureFetchResult for the first IOException. | |
while (result == null) { | |
val startFetchWait = System.currentTimeMillis() | |
- result = results.take() | |
+ val tuple = results.take() | |
+ result = tuple._1 | |
val stopFetchWait = System.currentTimeMillis() | |
+ val read_block_time = stopFetchWait - startFetchWait | |
+ val fetch_time = tuple._3 - tuple._2 | |
+ val schedule_time = startFetchWait - tuple._3 | |
+ | |
shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait) | |
result match { | |
case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) => | |
if (address != blockManager.blockManagerId) { | |
+ logInfo("remote block, block size " + size + " read block time: " + read_block_time + " schedule time " + schedule_time + " fetch time " + fetch_time) | |
numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1 | |
shuffleMetrics.incRemoteBytesRead(buf.size) | |
if (buf.isInstanceOf[FileSegmentManagedBuffer]) { | |
shuffleMetrics.incRemoteBytesReadToDisk(buf.size) | |
} | |
shuffleMetrics.incRemoteBlocksFetched(1) | |
+ } else { | |
+ logInfo("local block, block size " + size + " read block time: " + read_block_time + " schedule time " + schedule_time + " fetch time " + fetch_time) | |
} | |
if (!localBlocks.contains(blockId)) { | |
bytesInFlight -= size | |
diff --git a/pom.xml b/pom.xml | |
index 7c6bb4bcc8..012d9911dc 100644 | |
--- a/pom.xml | |
+++ b/pom.xml | |
@@ -2491,6 +2491,7 @@ | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-source-plugin</artifactId> | |
</plugin> | |
+<!-- | |
<plugin> | |
<groupId>org.scalastyle</groupId> | |
<artifactId>scalastyle-maven-plugin</artifactId> | |
@@ -2515,6 +2516,7 @@ | |
</execution> | |
</executions> | |
</plugin> | |
+ --> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-checkstyle-plugin</artifactId> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment