Skip to content

Instantly share code, notes, and snippets.

@jaceklaskowski
Last active February 19, 2023 14:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jaceklaskowski/e22a9c195747604870255368bad614b2 to your computer and use it in GitHub Desktop.
Save jaceklaskowski/e22a9c195747604870255368bad614b2 to your computer and use it in GitHub Desktop.
Dask

Dask Distributed

Install

Guided by Development Guidelines.

$ brew install --cask miniconda
$ conda --version
conda 23.1.0
$ pwd
/Users/jacek/dev/oss/dask/distributed

$ git branch --show-current
2023.2.0
$ conda env create --file continuous_integration/environment-3.11.yaml
...
Successfully installed dask-2023.2.0+21.g0eb4bd0b fsspec-2023.1.0 keras-2.11.0 s3fs-2023.1.0+4.g4f05107 zict-2.3.0

done
#
# To activate this environment, use
#
#     $ conda activate dask-distributed
#
# To deactivate an active environment, use
#
#     $ conda deactivate
$ conda activate dask-distributed

CommandNotFoundError: Your shell has not been properly configured to use 'conda activate'.
To initialize your shell, run

    $ conda init <SHELL_NAME>

Currently supported shells are:
  - bash
  - fish
  - tcsh
  - xonsh
  - zsh
  - powershell

See 'conda init --help' for more information and options.

IMPORTANT: You may need to close and restart your shell after running 'conda init'.
$ conda init zsh
no change     /usr/local/Caskroom/miniconda/base/condabin/conda
no change     /usr/local/Caskroom/miniconda/base/bin/conda
no change     /usr/local/Caskroom/miniconda/base/bin/conda-env
no change     /usr/local/Caskroom/miniconda/base/bin/activate
no change     /usr/local/Caskroom/miniconda/base/bin/deactivate
no change     /usr/local/Caskroom/miniconda/base/etc/profile.d/conda.sh
no change     /usr/local/Caskroom/miniconda/base/etc/fish/conf.d/conda.fish
no change     /usr/local/Caskroom/miniconda/base/shell/condabin/Conda.psm1
no change     /usr/local/Caskroom/miniconda/base/shell/condabin/conda-hook.ps1
no change     /usr/local/Caskroom/miniconda/base/lib/python3.9/site-packages/xontrib/conda.xsh
no change     /usr/local/Caskroom/miniconda/base/etc/profile.d/conda.csh
modified      /Users/jacek/.zshrc

==> For changes to take effect, close and re-open your current shell. <==
$ conda activate dask-distributed

The shell prompt should change to reflect the current virtual env.

$ python -m pip install -e . # mind the `.` (dot) at the end of the command
...
Installing collected packages: distributed
  Attempting uninstall: distributed
    Found existing installation: distributed 2023.2.0
    Uninstalling distributed-2023.2.0:
      Successfully uninstalled distributed-2023.2.0
  Running setup.py develop for distributed
Successfully installed distributed-2023.2.0+8.gfef78d4d

Testing

$ py.test distributed --verbose

Starting your first local Dask cluster

Following Starting your first local Ray cluster in the official documentation of Ray Framework.

At this point, it's assumed that Dask and Dask Distributed modules have been installed (aka pip install ray).

See also Install Dask.Distributed that seems to indicate that pip install dask distributed --upgrade should also work.

Following Quickstart.

$ python
Python 3.11.0 | packaged by conda-forge | (main, Jan 15 2023, 05:44:48) [Clang 14.0.6 ] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>>
>>> from dask.distributed import Client
>>> client = Client()
>>> client
<Client: 'tcp://127.0.0.1:62167' processes=4 threads=16, memory=32.00 GiB>

When we create a Client object it registers itself as the default Dask scheduler.

$ conda activate dask-distributed
(dask-distributed) ╭─ ~/dev/oss/dask/distributed ‹heads/2023.2.0›
╰─$ type dask
dask is /usr/local/Caskroom/miniconda/base/envs/dask-distributed/bin/dask
>>> def square(x):
...         return x ** 2
...
>>> A = client.map(square, range(10))
>>> total = client.submit(sum, A)
>>> total.result()
285
>>> total
<Future: finished, type: int, key: sum-5a8f9434ea700d7327fda99520d28a9c>
>>> client.gather(A)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

The client associates a key to all computations. This key is accessible on the Future object.

>>> total.key
'sum-5a8f9434ea700d7327fda99520d28a9c'

HTTP endpoints

https://distributed.dask.org/en/stable/http_services.html

>>> client
<Client: 'tcp://127.0.0.1:51481' processes=4 threads=16, memory=32.00 GiB>

It wasn't easy but I finally found it in Python API:

You can navigate to http://localhost:8787/status to see the diagnostic dashboard ("if you have Bokeh installed"???).

Open http://localhost:8787/status.

In Cluster manager features:

>>> client.cluster.dashboard_link
'http://127.0.0.1:8787/status'

8787 is the default value of dashboard_address of distributed.deploy.local.LocalCluster (from Reference).

WebSocket

Use wscat for scheduler events (on a websocket) at /eventstream.

$ npm install -g wscat
$ wscat -c localhost:8787/eventstream
Connected (press CTRL+C to quit)
< {"worker": "tcp://127.0.0.1:51496", "name": "add_worker"}
< {"worker": "tcp://127.0.0.1:51497", "name": "add_worker"}
< {"worker": "tcp://127.0.0.1:51498", "name": "add_worker"}
< {"worker": "tcp://127.0.0.1:51499", "name": "add_worker"}
> Ctrl+C

TODO

  1. Review /usr/local/Caskroom/miniconda/base/envs/dask-distributed/bin/dask
  2. Review map and submit methods (as they were mentioned in "Use the map and submit methods to launch computations on the cluster.")
  3. "We can stop this behavior by using the set_as_default=False keyword argument when starting the Client." in Dask
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment