Skip to content

Instantly share code, notes, and snippets.

Created November 29, 2021 05:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c to your computer and use it in GitHub Desktop.
Save yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c to your computer and use it in GitHub Desktop.
// scalastyle:off line.size.limit
usage = """_FUNC_(stageId) - Get task attemptNumber, and will throw FetchFailedException in the `stageId` Stage and make it retry.""",
examples = "",
since = "3.3.0",
group = "misc_funcs")
// scalastyle:on line.size.limit
case class TaskAttemptNumWithFailure(stage: Expression)
extends UnaryExpression with CodegenFallback {
override def foldable: Boolean = false
override def dataType: DataType = IntegerType
override def eval(input: InternalRow): Any = {
val id = stage.eval(input).asInstanceOf[Int]
val tc = TaskContext.get()
if (tc.stageAttemptNumber() == 0 &&
tc.stageId() == id &&
tc.attemptNumber() == 0 &&
tc.partitionId() == 0) {
val blockManagerId = SparkEnv.get.blockManager.shuffleServerId
val e = SparkCoreErrors.fetchFailedError(
blockManagerId, 0, 0L, 0, 0, "don't worry to see this")
throw e
} else {
override def prettyName: String = "task_num"
override def nullable: Boolean = false
override def child: Expression = stage
override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(stage = newChild)
Copy link

select * from
     (select v from (values (1), (2), (3) t(v))) t1
     (select task_num(3) from (select v from values (1), (2), (3) t(v) group by v)) t2;

Copy link

yaooqinn commented Nov 29, 2021

Without Map Stage Failure

With the simple query above, we replaced the stageId in task_num to something like task_num(300), then the query gave us the final results w/o any failures in the middle.
Its stage tab looks like below,


  1. stage 2 is skipped as it produces the same map output with stage 0

The DAG of Job 2 also shows that stage 2 is skipped.


Copy link

With FetchFailedException and Map Stage Retries

When rerunning spark-sql shell with the original SQL in


  1. stage 3 threw FetchFailedException and caused itself and its parent stage(stage 2) to retry
  2. stage 2 was skipped before but its attemptId was still 0, so when its retry happened it got removed from Skipped Stages

The DAG of Job 2 doesn't show that stage 2 is skipped anymore.


Besides, a retried stage usually has a subset of tasks from the original stage. If we mark it as an original one, the metrics might lead us into pitfalls.

Copy link

After fix


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment