-
-
Save yaooqinn/6acb7b74b343a6a6dffe8401f6b7b45c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// scalastyle:off line.size.limit | |
@ExpressionDescription( | |
usage = """_FUNC_(stageId) - Get task attemptNumber, and will throw FetchFailedException in the `stageId` Stage and make it retry.""", | |
examples = "", | |
since = "3.3.0", | |
group = "misc_funcs") | |
// scalastyle:on line.size.limit | |
case class TaskAttemptNumWithFailure(stage: Expression) | |
extends UnaryExpression with CodegenFallback { | |
override def foldable: Boolean = false | |
override def dataType: DataType = IntegerType | |
override def eval(input: InternalRow): Any = { | |
val id = stage.eval(input).asInstanceOf[Int] | |
val tc = TaskContext.get() | |
if (tc.stageAttemptNumber() == 0 && | |
tc.stageId() == id && | |
tc.attemptNumber() == 0 && | |
tc.partitionId() == 0) { | |
val blockManagerId = SparkEnv.get.blockManager.shuffleServerId | |
val e = SparkCoreErrors.fetchFailedError( | |
blockManagerId, 0, 0L, 0, 0, "don't worry to see this") | |
throw e | |
} else { | |
TaskContext.get().attemptNumber() | |
} | |
} | |
override def prettyName: String = "task_num" | |
override def nullable: Boolean = false | |
override def child: Expression = stage | |
override protected def withNewChildInternal(newChild: Expression): Expression = { | |
copy(stage = newChild) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
After fix