Skip to content

Instantly share code, notes, and snippets.

@yaooqinn
Created November 29, 2021 05:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c to your computer and use it in GitHub Desktop.
Save yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c to your computer and use it in GitHub Desktop.
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """_FUNC_(stageId) - Get task attemptNumber, and will throw FetchFailedException in the `stageId` Stage and make it retry.""",
examples = "",
since = "3.3.0",
group = "misc_funcs")
// scalastyle:on line.size.limit
case class TaskAttemptNumWithFailure(stage: Expression)
extends UnaryExpression with CodegenFallback {
override def foldable: Boolean = false
override def dataType: DataType = IntegerType
override def eval(input: InternalRow): Any = {
val id = stage.eval(input).asInstanceOf[Int]
val tc = TaskContext.get()
if (tc.stageAttemptNumber() == 0 &&
tc.stageId() == id &&
tc.attemptNumber() == 0 &&
tc.partitionId() == 0) {
val blockManagerId = SparkEnv.get.blockManager.shuffleServerId
val e = SparkCoreErrors.fetchFailedError(
blockManagerId, 0, 0L, 0, 0, "don't worry to see this")
throw e
} else {
TaskContext.get().attemptNumber()
}
}
override def prettyName: String = "task_num"
override def nullable: Boolean = false
override def child: Expression = stage
override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(stage = newChild)
}
}
@yaooqinn
Copy link
Author

select * from
     (select v from (values (1), (2), (3) t(v))) t1
     join
     (select task_num(3) from (select v from values (1), (2), (3) t(v) group by v)) t2;

@yaooqinn
Copy link
Author

yaooqinn commented Nov 29, 2021

Without Map Stage Failure

With the simple query above, we replaced the stageId in task_num to something like task_num(300), then the query gave us the final results w/o any failures in the middle.
Its stage tab looks like below,

image

  1. stage 2 is skipped as it produces the same map output with stage 0

The DAG of Job 2 also shows that stage 2 is skipped.

image

@yaooqinn
Copy link
Author

With FetchFailedException and Map Stage Retries

When rerunning spark-sql shell with the original SQL in https://gist.github.com/yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c#gistcomment-3977315

image

  1. stage 3 threw FetchFailedException and caused itself and its parent stage(stage 2) to retry
  2. stage 2 was skipped before but its attemptId was still 0, so when its retry happened it got removed from Skipped Stages

The DAG of Job 2 doesn't show that stage 2 is skipped anymore.

image

Besides, a retried stage usually has a subset of tasks from the original stage. If we mark it as an original one, the metrics might lead us into pitfalls.

@yaooqinn
Copy link
Author

After fix

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment