Skip to content

Instantly share code, notes, and snippets.

@tanghaodong25
Created April 8, 2019 03:41
Show Gist options
  • Save tanghaodong25/00103ce9515b9245a6f40f9f73ba4856 to your computer and use it in GitHub Desktop.
Save tanghaodong25/00103ce9515b9245a6f40f9f73ba4856 to your computer and use it in GitHub Desktop.
spark shuffle read block time log
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 94def4d31c..4f45e2d454 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -100,7 +100,7 @@ final class ShuffleBlockFetcherIterator(
* A queue to hold our results. This turns the asynchronous model provided by
* [[org.apache.spark.network.BlockTransferService]] into a synchronous model (iterator).
*/
- private[this] val results = new LinkedBlockingQueue[FetchResult]
+ private[this] val results = new LinkedBlockingQueue[(FetchResult, Long, Long)]
/**
* Current [[FetchResult]] being processed. We track this so we can release the current buffer
@@ -191,7 +191,8 @@ final class ShuffleBlockFetcherIterator(
// Release buffers in the results queue
val iter = results.iterator()
while (iter.hasNext) {
- val result = iter.next()
+ val tuple = iter.next()
+ val result = tuple._1
result match {
case SuccessFetchResult(_, address, _, buf, _) =>
if (address != blockManager.blockManagerId) {
@@ -223,6 +224,7 @@ final class ShuffleBlockFetcherIterator(
val remainingBlocks = new HashSet[String]() ++= sizeMap.keys
val blockIds = req.blocks.map(_._1.toString)
val address = req.address
+ val sendRequesTime = System.currentTimeMillis()
val blockFetchingListener = new BlockFetchingListener {
override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = {
@@ -232,10 +234,11 @@ final class ShuffleBlockFetcherIterator(
if (!isZombie) {
// Increment the ref count because we need to pass this to a different thread.
// This needs to be released after use.
+ val getRequestTime = System.currentTimeMillis()
buf.retain()
remainingBlocks -= blockId
- results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf,
- remainingBlocks.isEmpty))
+ results.put((new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf,
+ remainingBlocks.isEmpty), sendRequesTime, getRequestTime))
logDebug("remainingBlocks: " + remainingBlocks)
}
}
@@ -244,7 +247,7 @@ final class ShuffleBlockFetcherIterator(
override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
- results.put(new FailureFetchResult(BlockId(blockId), address, e))
+ results.put((new FailureFetchResult(BlockId(blockId), address, e), 0, 0))
}
}
@@ -326,17 +329,19 @@ final class ShuffleBlockFetcherIterator(
while (iter.hasNext) {
val blockId = iter.next()
try {
+ val startGetBlockTime = System.currentTimeMillis()
val buf = blockManager.getBlockData(blockId)
shuffleMetrics.incLocalBlocksFetched(1)
shuffleMetrics.incLocalBytesRead(buf.size)
buf.retain()
- results.put(new SuccessFetchResult(blockId, blockManager.blockManagerId,
- buf.size(), buf, false))
+ val endGetBlockTime = System.currentTimeMillis()
+ results.put((new SuccessFetchResult(blockId, blockManager.blockManagerId,
+ buf.size(), buf, false), startGetBlockTime, endGetBlockTime))
} catch {
case e: Exception =>
// If we see an exception, stop immediately.
logError(s"Error occurred while fetching local blocks", e)
- results.put(new FailureFetchResult(blockId, blockManager.blockManagerId, e))
+ results.put((new FailureFetchResult(blockId, blockManager.blockManagerId, e), 0, 0))
return
}
}
@@ -390,19 +395,27 @@ final class ShuffleBlockFetcherIterator(
// For local shuffle block, throw FailureFetchResult for the first IOException.
while (result == null) {
val startFetchWait = System.currentTimeMillis()
- result = results.take()
+ val tuple = results.take()
+ result = tuple._1
val stopFetchWait = System.currentTimeMillis()
+ val read_block_time = stopFetchWait - startFetchWait
+ val fetch_time = tuple._3 - tuple._2
+ val schedule_time = startFetchWait - tuple._3
+
shuffleMetrics.incFetchWaitTime(stopFetchWait - startFetchWait)
result match {
case r @ SuccessFetchResult(blockId, address, size, buf, isNetworkReqDone) =>
if (address != blockManager.blockManagerId) {
+ logInfo("remote block, block size " + size + " read block time: " + read_block_time + " schedule time " + schedule_time + " fetch time " + fetch_time)
numBlocksInFlightPerAddress(address) = numBlocksInFlightPerAddress(address) - 1
shuffleMetrics.incRemoteBytesRead(buf.size)
if (buf.isInstanceOf[FileSegmentManagedBuffer]) {
shuffleMetrics.incRemoteBytesReadToDisk(buf.size)
}
shuffleMetrics.incRemoteBlocksFetched(1)
+ } else {
+ logInfo("local block, block size " + size + " read block time: " + read_block_time + " schedule time " + schedule_time + " fetch time " + fetch_time)
}
if (!localBlocks.contains(blockId)) {
bytesInFlight -= size
diff --git a/pom.xml b/pom.xml
index 7c6bb4bcc8..012d9911dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2491,6 +2491,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
</plugin>
+<!--
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
@@ -2515,6 +2516,7 @@
</execution>
</executions>
</plugin>
+ -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment