Skip to content

Instantly share code, notes, and snippets.

@aseigneurin
Created November 1, 2016 16:42
Show Gist options
  • Save aseigneurin/3af6b228490a8deab519c6aea2c209bc to your computer and use it in GitHub Desktop.
Save aseigneurin/3af6b228490a8deab519c6aea2c209bc to your computer and use it in GitHub Desktop.
Spark - High availability

Spark - High availability

Components in play

As a reminder, here are the components in play to run an application:

  • The cluster:
    • Spark Master: coordinates the resources
    • Spark Workers: offer resources to run the applications
  • The application:
    • Driver: the part of the application that coordinates the processing
    • Executors: the distributed part of the application that process the data

When the driver is run in cluster mode, it runs on a worker.

Notice that each component run its own JVM: the workers spawn separate JVMs to run the driver and the executors.

Fault tolerance

With a Spark standalone cluster, here is what happens if the JVM running a component dies:

  • Master: ❌ can no longer run new applications and the UI becomes unavailable.
  • Worker: ✅ not a problem, the cluster simply has less resources.
  • Driver: ❌ the application dies.
  • Executor: ✅ not a problem, the partitions being processed are sent to another executor.

Notice that losing a JVM or losing the whole EC2 instance has the same effect.

Here is how to deal with these problems:

  • Master -> setup a standby master.
  • Driver -> run the application in supervised mode.

Setting up a standby master

Since the Master is a single point of failure, Spark offers the ability to start another instance of a master which will be in standby until the active master disappears. When the standby master becomes the active master, the workers will reconnect to this master and existing applicatione will continue running without problem.

This functionality relies on ZooKeeper to perform master election.

Starting the masters

First step is to get a ZooKeeper cluster up and running. You can then start the master with options to connect to ZooKeeper, e.g.:

SPARK_MASTER_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=<zkhost>:2181"

You should see the following lines in the log:

2016-11-01 10:44:16 INFO ZooKeeperLeaderElectionAgent: Starting ZooKeeper LeaderElection agent
...
2016-11-01 10:44:57 INFO Master: I have been elected leader! New state: ALIVE

In the UI of this master, you should see:

Status: ALIVE

Now, if you start a master with the same ZooKeeper options on another node, you should only see a line indicating an election is being performed but this master should not become the leader:

2016-11-01 10:51:27 INFO ZooKeeperLeaderElectionAgent: Starting ZooKeeper LeaderElection agent
...

In the UI of this new master, you should see:

Status: STANDBY

Starting the workers

When starting the worker, you now have to provide the reference to all the masters:

start-slave.sh spark://master1:7077,master2:7077

The workers will only be visible in the UI of the active master but they will have the ability to connect to the new master if the active one dies.

Submitting applications

The same principle applies when submitting applications:

spark-submit --master spark://master1:7077,master2:7077 ...

Master election

Now, if you kill the active master, you should see the following lines in the log of the previously standby master:

2016-11-01 11:27:39 INFO ZooKeeperLeaderElectionAgent: We have gained leadership
2016-11-01 11:27:39 INFO Master: I have been elected leader! New state: RECOVERING
...
2016-11-01 11:27:39 INFO Master: Worker has been re-registered: worker-20161101112104-127.0.0.1-57477
2016-11-01 16/11/01 11:27:39 INFO Master: Application has been re-registered: app-20161101112623-0000
2016-11-01 16/11/01 11:27:39 INFO Master: Recovery complete - resuming operations!

Here is what this indicates:

  • The standby master became the active master.
  • The worker reconnected to the new master.
  • The running application reconnected to the new master.

In the log of the workers, you should see the following:

2016-11-01 11:26:55 ERROR Worker: Connection to master failed! Waiting for master to reconnect...
...
2016-11-01 11:27:39 INFO Worker: Master has changed, new master is at spark://127.0.0.1:9099

And in the log of the driver of the running application, you should see:

2016-11-01 11:26:55 WARN StandaloneSchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
2016-11-01 11:26:55 WARN StandaloneAppClient$ClientEndpoint: Connection to 127.0.0.1:7077 failed; waiting for master to reconnect...
2016-11-01 11:27:39 INFO StandaloneAppClient$ClientEndpoint: Master has changed, new master is at spark://127.0.0.1:9099

Running the application in supervised mode

The supervised mode allows the driver to be restarted on a different node if it dies. Enabling this functionality simply requires adding the --supervise flag when running spark-submit:

spark-submit --supervise ...

Notice that restarting the application is the easy part: you need to be aware of potential side effects of re-running the application. Let's say you need to update the balance of an account by $100. If you simply increment the current balance, re-running this operation will end up increasing the balance by $200.

A best practice is to make the whole application idempotent. Idempotence means that re-running the application doesn't change the values that have already been written.This means you need a way to determine if each operation has already been processed. Another way to achieve idempotence is to use event sourcing.

@mattdornfeld
Copy link

Sorry I know this isn't the right place to ask, but I rarely get good answers for Spark high availability on Stackoverflow. I'm trying to setup a high availability Spark stand alone cluster using FILESYSTEM recovery mode. I was able to successfully configure the recovery mode, and I'm trying to test it by manually killing the master. A new master then comes online and recovers state from the snapshot directory. Here are the logs from the new master.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
18/03/13 17:17:49 INFO Master: Started daemon with process name: 15@genisys-job-15-spark-master-5f975fc8cc-qz428
18/03/13 17:17:49 INFO SignalUtils: Registered signal handler for TERM
18/03/13 17:17:49 INFO SignalUtils: Registered signal handler for HUP
18/03/13 17:17:49 INFO SignalUtils: Registered signal handler for INT
18/03/13 17:17:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/03/13 17:17:50 INFO SecurityManager: Changing view acls to: root
18/03/13 17:17:50 INFO SecurityManager: Changing modify acls to: root
18/03/13 17:17:50 INFO SecurityManager: Changing view acls groups to:
18/03/13 17:17:50 INFO SecurityManager: Changing modify acls groups to:
18/03/13 17:17:50 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
18/03/13 17:17:50 INFO Utils: Successfully started service 'sparkMaster' on port 7077.
18/03/13 17:17:50 INFO Master: Starting Spark master at spark://0.0.0.0:7077
18/03/13 17:17:50 INFO Master: Running Spark version 2.2.0
18/03/13 17:17:51 INFO Utils: Successfully started service 'MasterUI' on port 8080.
18/03/13 17:17:51 INFO MasterWebUI: Bound MasterWebUI to 0.0.0.0, and started at http://10.233.84.86:8080
18/03/13 17:17:51 INFO Utils: Successfully started service on port 6066.
18/03/13 17:17:51 INFO StandaloneRestServer: Started REST server for submitting applications on port 6066
18/03/13 17:17:51 INFO FileSystemRecoveryModeFactory: Persisting recovery state to directory: /spark_recovery
18/03/13 17:17:51 INFO Master: I have been elected leader! New state: RECOVERING
18/03/13 17:17:51 INFO Master: Trying to recover app: app-20180313171643-0000
18/03/13 17:17:51 INFO Master: Trying to recover worker: worker-20180313171630-10.233.84.85-33259
18/03/13 17:17:51 INFO Master: Trying to recover worker: worker-20180313171630-10.233.121.213-33775
18/03/13 17:17:52 INFO TransportClientFactory: Successfully created connection to /10.233.84.85:33259 after 94 ms (0 ms spent in bootstraps)
18/03/13 17:17:52 INFO TransportClientFactory: Successfully created connection to /10.233.121.213:33775 after 94 ms (0 ms spent in bootstraps)
18/03/13 17:17:52 INFO TransportClientFactory: Successfully created connection to /10.233.121.214:32919 after 96 ms (0 ms spent in bootstraps)
18/03/13 17:17:52 INFO Master: Application has been re-registered: app-20180313171643-0000
18/03/13 17:17:52 INFO Master: Worker has been re-registered: worker-20180313171630-10.233.121.213-33775
18/03/13 17:17:52 INFO Master: Worker has been re-registered: worker-20180313171630-10.233.84.85-33259
18/03/13 17:17:52 INFO Master: Recovery complete - resuming operations!

This all seems to be working correctly. However the driver program on the client seems to be unable to find the new master. Here are the logs from the Spark process running on the client.

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).

18/03/13 17:16:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[Stage 0:>                                                        (0 + 0) / 200]18/03/13 17:16:47 WARN TaskSetManager: Stage 0 contains a task of very large size (932 KB). The maximum recommended task size is 100 KB.

[Stage 0:>                                                        (0 + 8) / 200]18/03/13 17:17:13 WARN StandaloneAppClient$ClientEndpoint: Connection to 0.0.0.0:7077 failed; waiting for master to reconnect...
18/03/13 17:17:13 WARN StandaloneSchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...

It seems to just hang there. Any idea why the client is unable to find the new master?

@markcb2
Copy link

markcb2 commented Jul 5, 2018

Is the --supervise option relevant for running the master in Yarn with deploy mode set to "cluster"?

@archenroot
Copy link

I am now on AWS considering EMR cluster, but for our streaming project i might need only Spark standalone in HA, will try to adopt this setup in Terraform.

@LegolasVzla
Copy link

LegolasVzla commented Jan 29, 2020

Hi, great material. I have some doubts, I installed Spark (with pip) and ZooKeeper on Ubuntu, when Spark is installed with pip, it doesn't create a conf folder like in other ways, so inside of my virtualenv, specifically in pyspark/bin folder, I created a highavailability.conf folder with the followed structure:

spark.deploy.recoveryMode=ZOOKEEPER
spark.deploy.zookeeper.url=localhost:2181
spark.deploy.zookeeper.dir=<virtual_env_path>/pyspark

Then, I start ZooKeer service and finally I run a spark master worker:

spark-class org.apache.spark.deploy.master.Master --properties-file <virtual_env_path>/highavailability.conf

Everything is good, a ZooKeeperLeader is selected, then I do the same thing in another terminal to wait. I Kill the first one and the second takes the control. My question is, when the first one was still ALIVE I could open spark UI in http://localhost:8080/, but when the second takes the control, http://localhost:8080/ is down, do I missing something? I have seen in a few links that in spark.deploy.zookeeper.url parameter specified more than one hosts, to start ZooKeeper multi node, but with only localhost:2181 it works for me, so if the first spark master worker is running in the 8080 port, second one is running by default in which port? Thanks in advance

UPDATED: i find the solution:

I just specified in the spark-class command the below parameter:
--webui-port 8080 and for the other node: --webui-port 8081

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment