Created
September 22, 2024 22:15
-
-
Save kotlovs/437809684d7ebe3b7a93a1af804def8c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// implementation group: 'io.kubernetes', name: 'client-java' | |
import io.kubernetes.client.openapi.ApiResponse | |
import io.kubernetes.client.openapi.apis.CoreV1Api | |
import io.kubernetes.client.util.ClientBuilder | |
import org.apache.http.HttpStatus | |
import org.json4s.DefaultFormats | |
import org.json4s.jackson.JsonMethods.parse | |
import java.sql.Timestamp | |
import java.time.{Duration, Instant, LocalDateTime, ZoneOffset} | |
import scala.annotation.tailrec | |
import scala.collection.JavaConverters._ | |
object SparkConnectMonitoring { | |
implicit val formats: DefaultFormats.type = DefaultFormats | |
private val baseScApiUrl = "https://spark-connect.joom.ai/api/v1" | |
private val k8sNamespace = "spark-connect-service" | |
private val scDriverPrefix = "spark-connect-service-" | |
private val latestCompletionTimeIntervalMins = 4 | |
private val restartTimeoutHours = Duration.ofHours(3) | |
def run(): Unit = { | |
val appId = getCurrentScAppId | |
val runningJobs = getJobs("running", appId) | |
if (runningJobs.exists(_.submissionTime.isBefore(now().minusHours(3)))) { | |
SendAlert("SparkConnectMonitoring", s"There are long running jobs in SC. " + | |
s"Earliest submissionTime - ${runningJobs.minBy(t => Timestamp.valueOf(t.submissionTime).getTime).submissionTime}") | |
} else { | |
tryToRestart(appId, startTime = now(), timeout = restartTimeoutHours, restartAnyway = false) | |
} | |
} | |
@tailrec | |
private def tryToRestart(appId: String, startTime: LocalDateTime, timeout: Duration, restartAnyway: Boolean): Unit = { | |
val runningJobs = getJobs("running", appId) | |
val completedJobs = getJobs("succeeded", appId) ++ getJobs("failed", appId) | |
val latestCompletionTime = completedJobs.maxBy(t => Timestamp.valueOf(t.completionTime.get).getTime) | |
.completionTime.get | |
if (runningJobs.isEmpty && latestCompletionTime.isBefore(now().minusMinutes(latestCompletionTimeIntervalMins))) { | |
restartSparkConnect() | |
} else if (now().isAfter(startTime.plus(timeout))) { | |
if (restartAnyway) | |
restartSparkConnect() | |
else | |
SendAlert("SparkConnectMonitoring", s"There was no suitable opportunity to restart SC in $timeout") | |
} else { | |
// Still waiting for the appropriate moment to restart SC. | |
Thread.sleep(latestCompletionTimeIntervalMins * 60 * 1000) | |
tryToRestart(appId, startTime, timeout, restartAnyway) | |
} | |
} | |
private def getJobs(status: String, appId: String): Seq[Job] = { | |
val response = parse(HttpUtils.httpGet(baseScApiUrl + s"/applications/$appId/jobs?status=$status")) | |
response.children.map { job => | |
val submissionTime = parseTs((job \ "submissionTime").extract[String]) | |
val completionTime = status match { | |
case "running" => None | |
case "succeeded" | "failed" => Some(parseTs((job \ "completionTime").extract[String])) | |
} | |
Job(status, submissionTime, completionTime) | |
} | |
} | |
private def getCurrentScAppId = { | |
val response = parse(HttpUtils.httpGet(baseScApiUrl + "/applications")) | |
val apps = response.children | |
assert(apps.size == 1) | |
(apps.head \ "id").extract[String] | |
} | |
private def parseTs(ts: String): LocalDateTime = { | |
LocalDateTime.ofInstant(Instant.parse(ts.replace("GMT", "Z")), ZoneOffset.UTC) | |
} | |
private def now(): LocalDateTime = { | |
LocalDateTime.now(ZoneOffset.UTC) | |
} | |
private def restartSparkConnect(): Unit = { | |
SendAlert("SparkConnectMonitoring", s"Restart SparkConnect") | |
val k8sApi = new CoreV1Api(ClientBuilder.standard().build()) | |
val driverPod = getResult(k8sApi.listNamespacedPod(k8sNamespace).executeWithHttpInfo()).getItems.asScala | |
.filter(pod => pod.getMetadata.getName.startsWith(scDriverPrefix) && pod.getStatus.getPhase == "Running") | |
assert(driverPod.size == 1, s"Can't find the SC driver pod.") | |
val driverPodName = driverPod.head.getMetadata.getName | |
// Try to kill the SC driver pod. | |
getResult(k8sApi.deleteNamespacedPod(driverPodName, k8sNamespace) | |
.gracePeriodSeconds(300) | |
.executeWithHttpInfo()) | |
} | |
private def getResult[T](result: ApiResponse[T]): T = { | |
if (result.getStatusCode != HttpStatus.SC_OK) | |
throw new Exception(s"K8S api request returns a bad status code: ${result.getStatusCode}") | |
result.getData | |
} | |
} | |
case class Job( | |
status: String, | |
submissionTime: LocalDateTime, | |
completionTime: Option[LocalDateTime], | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment