Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save poplarShift/a58dbcb4a3f8c798024fd3e553bd8297 to your computer and use it in GitHub Desktop.
Save poplarShift/a58dbcb4a3f8c798024fd3e553bd8297 to your computer and use it in GitHub Desktop.
Dask-Jobqueue SLURMCluster with Singularity
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Dask-Jobqueue with Singularity"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Need adapter for sbatch, scancel, squeue via SSH"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"mkdir -p bin"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting bin/sbatch\n"
]
}
],
"source": [
"%%file bin/sbatch\n",
"#!/usr/bin/env bash\n",
"\n",
"# dirty hack: This cluster has single-tenant nodes only and hence\n",
"# does not accept users trying to specify memory requirements.\n",
"# So instead of teaching Dask-Jobqueue to make the memory spec optional, \n",
"# we remove the conflicting header line just before submitting the job.\n",
"grep -v '#SBATCH \\-\\-mem=' ${1} > ${1}.corr\n",
"\n",
"# SSH back to the host (var is passed through to singularity anyway).\n",
"# Then, source /etc/profile to make it feel like an interactive session, \n",
"# and submit the job.\n",
"ssh $(hostname) -q -t \". /etc/profile && sbatch ${1}.corr\""
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting bin/scancel\n"
]
}
],
"source": [
"%%file bin/scancel\n",
"#!/usr/bin/env bash\n",
"\n",
"# See bin/sbatch for details\n",
"ssh $(hostname) -q -t \". /etc/profile && scancel $@\""
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Overwriting bin/squeue\n"
]
}
],
"source": [
"%%file bin/squeue\n",
"#!/usr/bin/env bash\n",
"\n",
"# See bin/sbatch for details\n",
"ssh $(hostname) -q -t \". /etc/profile && squeue $@\""
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"!chmod -R 755 bin/s*"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"pwd = !echo ${PWD}\n",
"os.environ[\"PATH\"] = f\"{pwd[0]}/bin:{os.environ['PATH']}\""
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Set up dask jobqueue\n",
"\n",
"All we really need to do is make sure the generated job script loads the singularity module and uses a containered python executable."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"from dask_jobqueue import SLURMCluster\n",
"from dask.distributed import Client\n",
"import os"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"containered_python_exe = f\"singularity run {os.environ['SINGULARITY_CONTAINER']} python\""
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"cluster = SLURMCluster(\n",
" cores=40, memory=\"170G\", processes=4,\n",
" project=\"shk00023\",\n",
" queue='medium40:test', interface=\"ib0\",\n",
" walltime=\"00:20:00\",\n",
" env_extra=['module load singularity',], # ensure singularity is loaded\n",
" python=containered_python_exe, # use pyhton in container\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"#!/usr/bin/env bash\n",
"\n",
"#SBATCH -J dask-worker\n",
"#SBATCH -p medium40:test\n",
"#SBATCH -A shk00023\n",
"#SBATCH -n 1\n",
"#SBATCH --cpus-per-task=40\n",
"#SBATCH --mem=159G\n",
"#SBATCH -t 00:20:00\n",
"module load singularity\n",
"singularity run /home/shkifmwr/.singularity/cache/oci-tmp/0d046f7b1e98ec23133a61a71d65f550f300c8928db0d0da5ec99d4d524e962e/esm-vfc-stacks_latest.sif python -m distributed.cli.dask_worker tcp://10.246.201.2:42036 --nthreads 10 --nprocs 4 --memory-limit 42.50GB --name name --nanny --death-timeout 60 --interface ib0\n",
"\n"
]
}
],
"source": [
"print(cluster.job_script())"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Client</h3>\n",
"<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n",
" <li><b>Scheduler: </b>tcp://10.246.201.2:42036</li>\n",
" <li><b>Dashboard: </b><a href='http://10.246.201.2:8787/status' target='_blank'>http://10.246.201.2:8787/status</a></li>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Cluster</h3>\n",
"<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n",
" <li><b>Workers: </b>0</li>\n",
" <li><b>Cores: </b>0</li>\n",
" <li><b>Memory: </b>0 B</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: 'tcp://10.246.201.2:42036' processes=0 threads=0, memory=0 B>"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"client = Client(cluster)\n",
"display(client)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Let's scale the cluster to 12 nodes"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"cluster.scale(jobs=12)"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" JOBID PARTITION NAME USER ACCOUNT STATE TIME NODES NODELIST(REASON)\n",
" 2337935 medium40:tes dask-worker shkifmwr shk00023 PENDING 0:00 1 (None)\n",
" 2337931 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:01 1 gcn1276\n",
" 2337932 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:01 1 gcn1277\n",
" 2337933 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:01 1 gcn1033\n",
" 2337934 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:01 1 gcn1034\n",
" 2337928 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:04 1 gcn1108\n",
" 2337929 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:04 1 gcn1224\n",
" 2337930 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:04 1 gcn1225\n",
" 2337925 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:07 1 gcn1353\n",
" 2337926 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:07 1 gcn1404\n",
" 2337927 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:07 1 gcn1107\n",
" 2337924 medium40:tes dask-worker shkifmwr shk00023 RUNNING 0:10 1 gcn1141\n"
]
}
],
"source": [
"!bin/squeue -u $USER"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<table style=\"border: 2px solid white;\">\n",
"<tr>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Client</h3>\n",
"<ul style=\"text-align: left; list-style: none; margin: 0; padding: 0;\">\n",
" <li><b>Scheduler: </b>tcp://10.246.201.2:42036</li>\n",
" <li><b>Dashboard: </b><a href='http://10.246.201.2:8787/status' target='_blank'>http://10.246.201.2:8787/status</a></li>\n",
"</ul>\n",
"</td>\n",
"<td style=\"vertical-align: top; border: 0px solid white\">\n",
"<h3 style=\"text-align: left;\">Cluster</h3>\n",
"<ul style=\"text-align: left; list-style:none; margin: 0; padding: 0;\">\n",
" <li><b>Workers: </b>40</li>\n",
" <li><b>Cores: </b>400</li>\n",
" <li><b>Memory: </b>1.70 TB</li>\n",
"</ul>\n",
"</td>\n",
"</tr>\n",
"</table>"
],
"text/plain": [
"<Client: 'tcp://10.246.201.2:42036' processes=28 threads=280, memory=1.19 TB>"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## A calculation\n",
"\n",
"We use my [goto example for Dask array demos](https://nbviewer.jupyter.org/github/willirath/dask_jobqueue_workshop_materials/blob/15874fe653f2c129fd47b3a156d9a16292978505/notebooks/01_local_cluster_monte_carlo_estimate_of_pi.ipynb): An embarassingly parallel Monte Carlo estimate of the number Pi.\n",
"\n",
"[<img src=\"https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif\" width=\"300px\" alt=\"PI monte-carlo estimate\">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"from dask import array as darr\n",
"import numpy as np"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"from 1000.0 GB of random data:\tpi = 3.141594476352, err = 1.8227622069488802e-06\n",
"\n",
"CPU times: user 7.31 s, sys: 656 ms, total: 7.97 s\n",
"Wall time: 11.3 s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"xy = darr.random.uniform(\n",
" 0, 1,\n",
" size=(1e12 / 16, 2), chunks=(500e6 / 16, None)\n",
")\n",
"pi = 4 * ((xy ** 2).sum(-1) < 1).mean()\n",
"\n",
"pi = pi.compute()\n",
"\n",
"print(f\"from {xy.nbytes / 1e9} GB of random data:\\tpi = {pi}, err = {pi - np.pi}\\n\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Adaptive clusters\n",
"\n",
"We'll set a target duration of 2 minutes to perform the same calculation with 5 TB of random data."
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [],
"source": [
"ca = cluster.adapt(\n",
" minimum=4, maximum=48,\n",
" target_duration=\"1200\", # measured in CPU time per worker\n",
" # -> 120 seconds at 10 cores / worker\n",
" wait_count=4 # scale down more gently\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
" JOBID PARTITION NAME USER ACCOUNT STATE TIME NODES NODELIST(REASON)\n",
" 2337927 medium40:tes dask-worker shkifmwr shk00023 RUNNING 1:26 1 gcn1107\n"
]
}
],
"source": [
"!bin/squeue -u $USER"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"from 5000.0 GB of random data:\tpi = 3.1415927031936, err = 4.960380683982635e-08\n",
"\n",
"CPU times: user 1min 19s, sys: 8.31 s, total: 1min 27s\n",
"Wall time: 2min 53s\n"
]
}
],
"source": [
"%%time\n",
"\n",
"xy = darr.random.uniform(\n",
" 0, 1,\n",
" size=(5e12 / 16, 2), chunks=(500e6 / 16, None)\n",
")\n",
"pi = 4 * ((xy ** 2).sum(-1) < 1).mean()\n",
"\n",
"pi = pi.compute()\n",
"\n",
"print(f\"from {xy.nbytes / 1e9} GB of random data:\\tpi = {pi}, err = {pi - np.pi}\\n\")"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment