Originally created by UC Berkeley's AMPLab, Apache Spark is an open-source cluster-computing framework that has been maintained by the Apache Software Foundation. Mostly written in Scala, it is designed to be a fast unified analytics engine for large data, and is used and invested by many high profile or large-scale companies.
Discovered by Chinese researcher Fengwei Zhang (of Alibaba Cloud Security Team), the REST API CreateSubmissionRequest
can be abused while in standalone mode to allow users to submit a malicious code and result in remote code execution. A Metasploit module was submitted on November 12, 2018, and this is how our analysis began.
Accroding to Apache Spark's security issue list, this vulnerability is referred as CVE-2018-11770.
According to the vendor, versions from 1.3.0 running standalone master with REST API enabled, or running Mesos master which cluster mode enabled.
For testing purposes, the vulnerable version of Apache Spark can be installed as a Docker container by performing the following:
$ git clone https://github.com/vulhub/vulhub/tree/master/spark/unacc
$ docker-compose up -d
For manual installation, a walkthrough for the Windows version can be found here
The release notes of Apache Spark indicate that the vulnerabiliyt was patched in version 2.4.0, with the Jira ticket as [SPARK-25088]. The pull request can be found as #22071, with the following specifics:
- REST submission server is disabled by default in standalone mode.
- Fails the standalone master if REST server enabled and authentication secret set.
- Fails the mesos cluster dispatcher if authentication secret set.
- When submitting a standalone application, only try the REST submission first if
spark.master.rest.enabled=true
(in the config file).
Given the fact Apache Spark is quite well documented, including release notes, Jira tickets, pull requests, etc, it doesn't take very long to identify the code responsible for the inscure submission based on the patch, which is the StandaloneRestServer
class.
The StandaloneRestServer
class actually extends from the abstract RestSubmissionServer
class, so we need to take a look at that first. In the beginning of this class, it tells us how the URLs are mapped:
protected val baseContext = s"/${RestSubmissionServer.PROTOCOL_VERSION}/submissions"
protected lazy val contextToServlet = Map[String, RestServlet](
s"$baseContext/create/*" -> submitRequestServlet,
s"$baseContext/kill/*" -> killRequestServlet,
s"$baseContext/status/*" -> statusRequestServlet,
"/*" -> new ErrorServlet // default handler
)
The above tells us that if the server sees a request in this format, go to the submitRequestServlet
class:
/v1/submissions/create
Inside the submitRequestServlet
class, there is a doPost
function:
protected override def doPost(
requestServlet: HttpServletRequest,
responseServlet: HttpServletResponse): Unit = {
val responseMessage =
try {
val requestMessageJson = Source.fromInputStream(requestServlet.getInputStream).mkString
val requestMessage = SubmitRestProtocolMessage.fromJson(requestMessageJson)
// The response should have already been validated on the client.
// In case this is not true, validate it ourselves to avoid potential NPEs.
requestMessage.validate()
handleSubmit(requestMessageJson, requestMessage, responseServlet)
} catch {
// The client failed to provide a valid JSON, so this is not our fault
case e @ (_: JsonProcessingException | _: SubmitRestProtocolException) =>
responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError("Malformed request: " + formatException(e))
}
sendResponse(responseMessage, responseServlet)
}
Basically what this does is getting data from the stream, normalize, and then pass that to a handleSubmit
function. Whoever is using RestSubmissionServer
would have to implment handleSubmit
.
Now that we have a basic understanding of the abstract class, we can look at the sub classes, and StandaloneRestServer
is one of those that extends RestSubmissionServer
and seems to fit the description of the problem.
private[deploy] class StandaloneRestServer(
host: String,
requestedPort: Int,
masterConf: SparkConf,
masterEndpoint: RpcEndpointRef,
masterUrl: String)
extends RestSubmissionServer(host, requestedPort, masterConf)
It's also very easy to identify what we should be looking at because of this line in the code:
protected override val submitRequestServlet =
new StandaloneSubmitRequestServlet(masterEndpoint, masterUrl, masterConf)
In StandaloneSubmitRequestSevelet
, we find the handleSubmit
code we need:
protected override def handleSubmit(
requestMessageJson: String,
requestMessage: SubmitRestProtocolMessage,
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val driverDescription = buildDriverDescription(submitRequest)
val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](
DeployMessages.RequestSubmitDriver(driverDescription))
val submitResponse = new CreateSubmissionResponse
submitResponse.serverSparkVersion = sparkVersion
submitResponse.message = response.message
submitResponse.success = response.success
submitResponse.submissionId = response.driverId.orNull
val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
if (unknownFields.nonEmpty) {
// If there are fields that the server does not know about, warn the client
submitResponse.unknownFields = unknownFields
}
submitResponse
case unexpected =>
responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError(s"Received message of unexpected type ${unexpected.messageType}.")
}
}
Looking at the code, the buildDriverDescription
function seems a little interesting. First off, it reveals what the appResource
parameter means. The function is bit large to post bit, basically it starts off this way:
val appResource = Option(request.appResource).getOrElse {
throw new SubmitRestMissingFieldException("Application jar is missing.")
}
And then it is passed in DriverDescription
toward the end of the code:
new DriverDescription(appResource, actualDriverMemory, actualDriverCores, actualSuperviseDriver, command)
When we look at the DriverDescription
class, we understand appResource
's purpose, which is a URL to a jar file:
private[deploy] case class DriverDescription(
jarUrl: String,
mem: Int,
cores: Int,
supervise: Boolean,
command: Command)
At this point, it basically means that when we create a submission request, we can pass a JAR from remote, which pretty much dictates the attack vector.
Back to the buildDriverDescription
function, it also gives us clues about how the JAR is launched:
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs,
environmentVariables, extraClassPath, extraLibraryPath, javaOpts)
The Command
class is defined as follows:
private[spark] case class Command(
mainClass: String,
arguments: Seq[String],
environment: Map[String, String],
classPathEntries: Seq[String],
libraryPathEntries: Seq[String],
javaOpts: Seq[String]) {
}
We see that the first argument is the main class, which in this case is org.apache.spark.deploy.worker.DriverWrapper
, and it's basically meant to invoke our main
method:
val clazz = Utils.classForName(mainClass)
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])
After the driver description is crafted, it is sent to another component called RequestSubmitDriver
:
val response = masterEndpoint.askSync[DeployMessages.SubmitDriverResponse](DeployMessages.RequestSubmitDriver(driverDescription))
RequestSubmitDriver
can be found in the receiveAndReply
function in Master.scala:
... code ...
case RequestSubmitDriver(description) =>
if (state != RecoveryState.ALIVE) {
val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
"Can only accept driver submissions in ALIVE state."
context.reply(SubmitDriverResponse(self, false, None, msg))
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()
... code ...
In the else
block, we can see that our driver based on the description is added to some collection of drivers and then it calls for schedule()
. Let's take a look at what that does:
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
Notice this function calls something called launchDriver
, which sounds interesting, let's keep looking:
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}
So the code basically relies on the Worker
class to launch the driver, and we can find this in Worker.scala:
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
driver.start()
Oh, I wonder what driver.start()
does. Since this is from a DriverRunner
instance, that's where we find the code.
The purpose of the DriverRunner
is pretty self-explanatory, no need for introduction.
The Worker class tells us we should be looking at the start()
function, but there is just a lot of code for process setup and handling, so let's go straight to the point. Tracing from start()
, it leads you to this path:
start() -> prepareAndRunDriver() -> runDriver() -> runCommandWithRetry()
The runCommandWithRetry
fucntion finally executes our code using ProcessBuilder:
private[worker] def runCommandWithRetry(
command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
.... code ...
synchronized {
if (killed) { return exitCode }
process = Some(command.start())
initialize(process.get)
}
... code ...
Now that we have found the code that executes our payload, we should have a basic understanding of what the execution flow looks like, which is the end of the analysis.
In order to exploit this, the Metasploit module relies on two mixins: the HttpClient
and HttpServer
. These two mixins can be tricky with each other, but luckily there is write-up explaining how to use them together, which can be found here.
The HttpServer
mixin would be used to host the payload. And the HttpClient
sends a submission request to Apache Spark's REST API server, which instructs it to download the payload. Although quite reliable, because this is more about abusing a feature and not some kind of memory corruption, the module can leave quite a few artifacts on the target system. The most obvious one is the malicious driver that can be seen in the master GUI.
Metasploit has a soft policy that prefers modules to use randomness whenever they could. For example: randomzing the payload name, path, padding, etc. However, the randomness actually makes the payload stand out a lot on the master GUI list, which makes this soft policy somewhat debatable.
Although we're here to audit for a vulnerability in Apache Spark, it doesn't really make us think less of the product. In fact, it's quite the opposite. Apache Spark displays a lot of good software development habits that other people should follow. For example, throughout the reviewing process, we noticed that:
- Apache Spark's release notes are well written, and link to references.
- They use Jira for tracking tickets, and they seem to make sure if there's a pull request, the PR # is attached to it.
- Developers take the time to write good descriptions for pull requests, and they try to make sure the reviewers understand them. This is really amazing, because too many developers are too lazy to do this. PR titles are also properly labeled with the Jira ticket #.
- Someone is always reviewing before code is merged.
- Comments can be seen throughout the codebase.
- Tons of documentations.
As for the CreateSubmissionRequest
REST API, technically this is a feature, not exactly a bug. But since it was the default configuration and not carefully documented, it became a potentially over-powered feature that would be reliably abused as a vulnerability by attackers. In the real world, Apache Spark tends to be used with a lot of data behind it, so companies definitely could not afford to have this problem hanging around the network anyway.