Skip to content

Instantly share code, notes, and snippets.

@rom1504
Last active August 7, 2023 02:03
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rom1504/561aa2c519c8a7892fe36db77a068b88 to your computer and use it in GitHub Desktop.
Save rom1504/561aa2c519c8a7892fe36db77a068b88 to your computer and use it in GitHub Desktop.
spark_on_ssh.py
from pssh.clients import ParallelSSHClient
import subprocess
import socket
import fire
import os
def get_ips_of_slurm_job(job_id):
c = "sinfo -N -n `squeue -j "+str(job_id)+" | tail -1 | awk '{print $8}'` | tail -n +2 | awk '{print $1}'"
hosts = subprocess.check_output(c, shell=True).decode("utf8")[:-1].split("\n")
ips = [socket.gethostbyname(host) for host in hosts]
return ips
def run(ips_to_run, command):
print(ips_to_run)
client = ParallelSSHClient(ips_to_run, timeout=10, pool_size=len(ips_to_run))
output = list(client.run_command(command, stop_on_errors=False))
print([(o.client.host if o.client is not None else "", ("\n".join(o.stdout) if o.stdout else "")) for o in output])
def start_spark_cluster(ips, cpus, mem_in_gb, spark_path, spark_local_dir):
master_addr = ips[0]
run([master_addr], f'{spark_path}/spark-3.3.1-bin-hadoop3/sbin/start-master.sh -p 7077 -h {master_addr}')
c = f'{spark_path}/spark-3.3.1-bin-hadoop3/sbin/start-worker.sh -c {cpus} -m {mem_in_gb}g "spark://{master_addr}:7077"'
run(ips, "bash -c 'SPARK_WORKER_DIR="+spark_local_dir+"/work SPARK_LOCAL_DIRS="+spark_local_dir+"/local "+c+"'")
def stop_spark_cluster(ips):
run(ips, f'bash -c "pkill java"')
def main(cpus=48, mem_in_gb=256,spark_path=None,job_id=None,command="start", spark_local_dir="/tmp"):
if spark_path is None:
spark_path = os.getcwd()
ips = get_ips_of_slurm_job(job_id)
if command == "stop":
stop_spark_cluster(ips)
elif command == "start":
start_spark_cluster(ips, cpus, mem_in_gb, spark_path, spark_local_dir)
# python spark_on_ssh.py --cpus=48 --mem_in_gb=256 --job_id=2213 --spark_local_dir="/scratch/spark" --command="start"
if __name__ == '__main__':
fire.Fire(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment