-
-
Save yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// scalastyle:off line.size.limit | |
@ExpressionDescription( | |
usage = """_FUNC_(stageId) - Get task attemptNumber, and will throw FetchFailedException in the `stageId` Stage and make it retry.""", | |
examples = "", | |
since = "3.3.0", | |
group = "misc_funcs") | |
// scalastyle:on line.size.limit | |
case class TaskAttemptNumWithFailure(stage: Expression) | |
extends UnaryExpression with CodegenFallback { | |
override def foldable: Boolean = false | |
override def dataType: DataType = IntegerType | |
override def eval(input: InternalRow): Any = { | |
val id = stage.eval(input).asInstanceOf[Int] | |
val tc = TaskContext.get() | |
if (tc.stageAttemptNumber() == 0 && | |
tc.stageId() == id && | |
tc.attemptNumber() == 0 && | |
tc.partitionId() == 0) { | |
val blockManagerId = SparkEnv.get.blockManager.shuffleServerId | |
val e = SparkCoreErrors.fetchFailedError( | |
blockManagerId, 0, 0L, 0, 0, "don't worry to see this") | |
throw e | |
} else { | |
TaskContext.get().attemptNumber() | |
} | |
} | |
override def prettyName: String = "task_num" | |
override def nullable: Boolean = false | |
override def child: Expression = stage | |
override protected def withNewChildInternal(newChild: Expression): Expression = { | |
copy(stage = newChild) | |
} | |
} |
Author
yaooqinn
commented
Nov 29, 2021
Without Map Stage Failure
With the simple query above, we replaced the stageId
in task_num
to something like task_num(300)
, then the query gave us the final results w/o any failures in the middle.
Its stage tab looks like below,
- stage 2 is skipped as it produces the same map output with stage 0
The DAG of Job 2 also shows that stage 2 is skipped.
With FetchFailedException and Map Stage Retries
When rerunning spark-sql shell with the original SQL in https://gist.github.com/yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c#gistcomment-3977315
- stage 3 threw FetchFailedException and caused itself and its parent stage(stage 2) to retry
- stage 2 was skipped before but its attemptId was still 0, so when its retry happened it got removed from
Skipped Stages
The DAG of Job 2 doesn't show that stage 2 is skipped anymore.
Besides, a retried stage usually has a subset of tasks from the original stage. If we mark it as an original one, the metrics might lead us into pitfalls.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment