Skip to content

Instantly share code, notes, and snippets.

@yaooqinn
Created November 29, 2021 05:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c to your computer and use it in GitHub Desktop.
Save yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c to your computer and use it in GitHub Desktop.
// scalastyle:off line.size.limit
@ExpressionDescription(
usage = """_FUNC_(stageId) - Get task attemptNumber, and will throw FetchFailedException in the `stageId` Stage and make it retry.""",
examples = "",
since = "3.3.0",
group = "misc_funcs")
// scalastyle:on line.size.limit
case class TaskAttemptNumWithFailure(stage: Expression)
extends UnaryExpression with CodegenFallback {
override def foldable: Boolean = false
override def dataType: DataType = IntegerType
override def eval(input: InternalRow): Any = {
val id = stage.eval(input).asInstanceOf[Int]
val tc = TaskContext.get()
if (tc.stageAttemptNumber() == 0 &&
tc.stageId() == id &&
tc.attemptNumber() == 0 &&
tc.partitionId() == 0) {
val blockManagerId = SparkEnv.get.blockManager.shuffleServerId
val e = SparkCoreErrors.fetchFailedError(
blockManagerId, 0, 0L, 0, 0, "don't worry to see this")
throw e
} else {
TaskContext.get().attemptNumber()
}
}
override def prettyName: String = "task_num"
override def nullable: Boolean = false
override def child: Expression = stage
override protected def withNewChildInternal(newChild: Expression): Expression = {
copy(stage = newChild)
}
}
@yaooqinn
Copy link
Author

After fix

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment