Skip to content

Instantly share code, notes, and snippets.

@smartnose
Last active March 20, 2024 21:23
Show Gist options
  • Star 42 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save smartnose/9f173b4c36dc31310e8efd27c3535a14 to your computer and use it in GitHub Desktop.
Save smartnose/9f173b4c36dc31310e8efd27c3535a14 to your computer and use it in GitHub Desktop.
Spark internal notes

Spark internals through code

Nothing gives you more detail about spark internals than actually reading it source code. In addition, you get to learn many design techniques and improve your scala coding skills. These are the random notes I make while reading the spark code. The best way to comprehend the notes is to load spark code into an IDE, e.g. IntelliJ, and navigate the code on the side.

Genesis - creation of a spark cluster

The scripts for creating a spark cluster are: start-master.sh and start-slave.sh. Read them carefully, and you can see that both scripts are very similar except the values for $CLASS variable. For start-master.sh, the value is CLASS="org.apache.spark.deploy.master.Master", while the value for start-slave.sh is shown below with more context.

# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
CLASS="org.apache.spark.deploy.worker.Worker"

if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
  echo "Usage: ./sbin/start-slave.sh [options] <master>"
  pattern="Usage:"
  pattern+="\|Using Spark's default log4j profile:"
  pattern+="\|Registered signal handlers for"

  "${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
  exit 1
fi

Here, both start-master.sh and start-slave.sh calls into spark-class, which is another bash shell that finally starts the JVM.

build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

$RUNNER is basically $JAVA_HOME/bin/java. Interestingly, launcher.Main isn't the main program that takes another class (e.g. org.apache.spark.deploy.worker.Worker), and invokes it. Instead, laucher.Main takes the class, figures out (1) generic java VM parameters and (2) class-specific parameters, and converts them together with the input class name into a string so jvm can in-turn run the class.

Now, we identified that the real work horses are Master.scala and Worker.scala. Now let's figure out their input, and behavior.

Inputs to the master and worker

MasterArguments.scala WorkerArguments.scala are pretty self-explanatory.

The master

Let's now look at the behavior of Master. The constructor parameters immediately reveals serveral interesting things:

  • RPC - the master and slaves have to talk to each other. The logic is encapsulated in RpcEnv (will discuss more in the following sections)
  • webUiPort - the port of web server for reporting the cluster status.
  • LeaderElectable trait - when the master fails, a new master will be elected.
private[deploy] class Master(
    override val rpcEnv: RpcEnv,
    address: RpcAddress,
    webUiPort: Int,
    val securityMgr: SecurityManager,
    val conf: SparkConf)
  extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {

The PRC

Both master and worker extends trait ThreadSafeRpcEndpoint. The Rpc mechanism is largely described in RpcEnv.scala and particularly in NettyRpcEnv.scala. Note that if you're reading codebase of Spark 1.6, you will also find AkkaRpcEnv.scala. This dependency on Akka is removed in the latest version (see this discussion on why removing the dependency)

Recovery

In case of crash, the cluster will recover itself by electing a new master node. This involes two steps: (1) persisting and reloading master node state (2) electing a new master node. Master made it clear that there are four different approaches (Zookeeper for cluster deployment, FileSystem for local mode, custom for extensibility, and blackhole which basically do not persist any state information):

    val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
      case "ZOOKEEPER" =>
        logInfo("Persisting recovery state to ZooKeeper")
        val zkFactory =
          new ZooKeeperRecoveryModeFactory(conf, serializer)
        (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
      case "FILESYSTEM" =>
        val fsFactory =
          new FileSystemRecoveryModeFactory(conf, serializer)
        (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
      case "CUSTOM" =>
        val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
        val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
          .newInstance(conf, serializer)
          .asInstanceOf[StandaloneRecoveryModeFactory]
        (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
      case _ =>
        (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
    }

Zookeeper recovery mode is the only interesting one worth explaining. Basically, all the nodes in the cluster will start both master and slave processes, but only one master process will become the leader. This is ensured through Zookeeper latch concurrency control. You can think of all these master processes are rushing to Zookeeper asking to be the leader, but zookeeper only gives the leader status to the first arrival and deny the rest. After then, the leadership position is renewed with Zookeeper and the state data is persisted to Zookeeper. Should the leader dies, Zookeeper will notify all other master processes, and one of them will become the new leader and load the state information.

Configuration

In a distributed system, the configuration data needs to be propagated to the entire cluster. If the configuration changes dynamically, the updates should also be visible for the all the nodes in the cluster. Spark configuration data can largely be grouped into:

  1. Data specific to Spark itself like the parameters for master and worker nodes.
  2. Configuration required by the underlying systems such as hadoop and log4j (log4j.properties)

For 1, a natural choice would be using Spark broadcasting infrastructure to propogate the configuration data. For 2, Spark rely on YARN and copy over the config files. An excerpt of the code that propogates configuration data:

    // Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that
    // the executors will use the latest configurations instead of the default values. This is
    // required when user changes log4j.properties directly to set the log configurations. If
    // configuration file is provided through --files then executors will be taking configurations
    // from --files instead of $SPARK_CONF_DIR/log4j.properties.

    // Also uploading metrics.properties to distributed cache if exists in classpath.
    // If user specify this file using --files then executors will use the one
    // from --files instead.
    for { prop <- Seq("log4j.properties", "metrics.properties")
          url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop))
          if url.getProtocol == "file" } {
      hadoopConfFiles(prop) = new File(url.getPath)
    }

Nuts and bolts

The important features of Spark go here.

Dataset

Typed Dataset

Generating schema

ScalaReflection

Broadcasting

One of the important features of Spark is broadcast variable and functions. BroadcastManager is responsible for creating broadcast variable, but it is a really lightweight class which only manages: (1) an incremental integer id to assign a unique id for each broadcast variable (2) delegate the function for creating broadcast variables to other classes such as BroadcastFactory and TorrentBroadcast to performan low-level serialization and inter-process communication.

PySpark

For data science projects, you will most likely use PySpark which provides a nice python portal to underlying Spark JVM APIs. There are times, however, you scratch your head and couldn't figure out why PySpark isn't doing what it's supposed to do. In this section, will describe how PySpark works internally, and propose a solution to collect logs from PySpark lamda functions.

The shell

The pyspark shell command reroutes to spark-submit, which in-turn goes to spark-class entry point as discussed in the genesis chapter. Here, the scala class invoked is defined in SparkSubmit.scala. SparkSubmit.scala is a very busy file that figures out how to shovel client-side commands, user-written scripts to the spark cluster. Because spark supports various cluster deployment modes, and different coding languages (R, Python, Java or Scala), SparkSubmit.scala has to figure out the right module which will handle the corresponding request. For python code, the following exerpt gives some hint:

    // If we're running a python app, set the main class to our specific python runner
    if (args.isPython && deployMode == CLIENT) {
      if (args.primaryResource == PYSPARK_SHELL) {
        args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
      } else {
        // If a python file is provided, add it to the child arguments and list of files to deploy.
        // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
        args.mainClass = "org.apache.spark.deploy.PythonRunner"
        args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
        if (clusterManager != YARN) {
          // The YARN backend distributes the primary file differently, so don't merge it.
          args.files = mergeFileLists(args.files, args.primaryResource)
        }
      }

The execution

Rdd.py serializes python lamda functions together with other supporting context data (module names, variables) and relay these data to the internal scala RDD type PythonRdd.scala. There are a few caveats in the process:

  • The serializer doesn't handle everything. Some of the context data needed to run the python lamda function may be too difficult to serialize (e.g. a lock object created by the containing scope of the lamda function). Serializing the state of this lock object involves much larger python internal state. As a result, the funcitons being broadcast to the worker nodes cannot access lower level IO or system objects. In addition, the lamda function cannot access Spark Context or Spark Session.
  • PythonRdd.scala does not do the computation itself (it simply doesn't know how to run python code). Instead, it starts a Python Worker, and talk to the worker through socket (excerpt from worker.py):
if __name__ == '__main__':
    # Read a local port to connect to from stdin
    java_port = int(sys.stdin.readline())
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(("127.0.0.1", java_port))
    sock_file = sock.makefile("rwb", 65536)
    main(sock_file, sock_file)

Logging in PySpark

Logging in PySpark is a frequently discussed issue. For example, see this and this. All of these solutions suggests relay the logging to JVM through sc._jvm.org.apache.log4j. Of course, a big downside is that you don't have access to spark context in lamda functions! For example, if you write the following code, spark will refuse to execute, complaining that broadcast function cannot be serialized.

def broadcast():
    log4jref = sc._jvm.org.apache.log4j
    LOGGER = log4jref.getLogger("daydream")
    LOGGER.info....
    
text_file = sc.textFile("hdfs://...")
mapped = text_file.map(broadcast)...

A cleaner solution is to rely on standard python logging library, so the code should look like this:

import logging
def broadcast():
    logger = logging.getLogger("broadcast")
    logger.info("here we go")...
    
text_file = sc.textFile("hdfs://...")
mapped = text_file.map(broadcast)...

This does not work well either: the log data, by default, will be output to stdout, which gets merged into log4j stdout, and saved as HDFS file streams. You can get the log data eventually through yarn log command. To improve, you can deploy a python log handler, for example, AWS SQS log handler that collects the log data in realtime to the cloud. You can then pull the log data using a downstream service to look at them as your job runs. You python code then look like this:

import logging
import sqsloghander
def broadcast():
    logger = logging.getLogger("broadcast")
    logger.addHandler(sqsloghandler...)
    logger.info("here we go")...
    
text_file = sc.textFile("hdfs://...")
mapped = text_file.map(broadcast)...

You can further simplify your code, by wrapping logging and sqsloghandler into a single python module, e.g. sqslogging, and set up the handler when initializing the module. When you do this, make sure the module is deployed to all nodes of the Spark cluster.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment