Skip to content

Instantly share code, notes, and snippets.

@lzamparo
Last active September 26, 2017 03:40
Show Gist options
  • Save lzamparo/1ec0275efac635b103725412808319e9 to your computer and use it in GitHub Desktop.
Save lzamparo/1ec0275efac635b103725412808319e9 to your computer and use it in GitHub Desktop.
dask.distributed on LSF
import os
import gzip
from dask.distributed import Client
from dask import delayed
# connect to scheduler
client = Client(scheduler_file='~/utils/dask-distributed-test/scheduler.json') ### <--- hangs here in interactive session
# try some computation on the input data files
selex_files = [f for f in os.listdir('/data/leslie/zamparol/selex_data/raw_seqs/')]
@delayed
def count_lines(f):
try:
with gzip.open(f, 'r') as myfile:
numlines = myfile.readlines()
except:
with open(f, 'r') as myfile:
numlines = myfile.readlines()
return numlines
# send out the tasks
L = client.map(count_lines, selex_files)
# gather and sum the results
total = client.submit(sum, L)
total_lines = total.result()
print("total lines from all SELEX files was: ", total_lines)
# close the client
client.close()
{
"services": {
"http": 9786
},
"id": "Scheduler-f191fa4c-1c59-4885-a0fa-f40c3941cad3",
"workers": {},
"type": "Scheduler",
"address": "tcp://10.230.2.65:8786"
}
>>> import os
>>> import gzip
>>> from dask.distributed import Client
>>> from dask import delayed
>>> selex_files = [f for f in os.listdir('/data/leslie/zamparol/selex_data/raw_seqs/')]
>>> len(selex_files)
435
>>> @delayed
... def count_lines(f):
... try:
... with gzip.open(f, 'r') as myfile:
... numlines = myfile.readlines()
... except:
... with open(f, 'r') as myfile:
... numlines = myfile.readlines()
... return numlines
...
>>>
>>> client = Client(scheduler_file='~/utils/dask-distributed-test/scheduler.json')
>>>
>>> ### after Ctrl-D
distributed.utils - ERROR - 'Client' object has no attribute 'scheduler_comm'
Traceback (most recent call last):
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/distributed/utils.py", line 451, in log_errors
yield
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/distributed/client.py", line 897, in _close
if self.scheduler_comm and self.scheduler_comm.comm and not self.scheduler_comm.comm.closed():
AttributeError: 'Client' object has no attribute 'scheduler_comm'
distributed.utils - ERROR - 'Client' object has no attribute 'scheduler_comm'
Traceback (most recent call last):
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/distributed/utils.py", line 229, in f
result[0] = yield make_coro()
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 307, in wrapper
yielded = next(result)
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/distributed/client.py", line 897, in _close
if self.scheduler_comm and self.scheduler_comm.comm and not self.scheduler_comm.comm.closed():
AttributeError: 'Client' object has no attribute 'scheduler_comm'
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/distributed/utils.py", line 229, in f
result[0] = yield make_coro()
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
value = future.result()
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
raise_exc_info(self._exc_info)
File "<string>", line 4, in raise_exc_info
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/tornado/gen.py", line 307, in wrapper
yielded = next(result)
File "/home/zamparol/anaconda3/lib/python3.5/site-packages/distributed/client.py", line 897, in _close
if self.scheduler_comm and self.scheduler_comm.comm and not self.scheduler_comm.comm.closed():
AttributeError: 'Client' object has no attribute 'scheduler_comm'
#!/bin/bash
#BSUB -J mydaskscheduler
#BSUB -n 4
#BSUB -R span[ptile=2]
#BSUB -R rusage[mem=4]
#BSUB -W 03:00
#BSUB -o %J.stdout
#BSUB -eo %J.stderr
cd $LS_SUBCWD
dask-scheduler --scheduler-file ~/utils/dask-distributed-test/scheduler.json
#!/bin/bash
#BSUB -J mydaskworker
#BSUB -n 4
#BSUB -R span[ptile=2]
#BSUB -R rusage[mem=4]
#BSUB -W 01:00
#BSUB -o %J.stdout
#BSUB -eo %J.stderr
cd $LS_SUBCWD
echo "started worker"
dask-worker --scheduler-file ~/utils/dask-distributed-test/scheduler.json
#! /bin/bash
# num workers
workers=$1
# start the scheduler, extract job id
startjob=$(bsub < start-scheduler.lsf)
[[ $startjob =~ ^.*\<([0-9]+)\> ]]
jobid=${BASH_REMATCH[1]}
echo "extracted $jobid as job id"
# start the workers
for worker in $(seq 1 $workers)
do
bsub -w "started($jobid)" < start-worker.lsf
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment