Skip to content

Instantly share code, notes, and snippets.

@jacobtomlinson
Last active February 21, 2018 09:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jacobtomlinson/f4365393463fc3247f494673de110263 to your computer and use it in GitHub Desktop.
Save jacobtomlinson/f4365393463fc3247f494673de110263 to your computer and use it in GitHub Desktop.
Starting with daskernetes.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": "# Trying out daskernetes\n\nWe are excited to announce that the work we've been doing with distributed Dask clusters running on Kubernetes has been absorbed into an awesome new tool called Daskernetes through our work on the [Pangeo project](http://matthewrocklin.com/blog/work/2018/01/22/pangeo-2).\n\nThe [Daskernetes](https://github.com/dask/daskernetes) python module allows you to create personal [distributed Dask](https://distributed.readthedocs.io/en/latest/) cluster when running your notebook on a Kubernetes cluster. This article will walk through getting started with Daskernetes.\n\nThis article is also a notebook which you can [download](https://gist.github.com/jacobtomlinson/f4365393463fc3247f494673de110263) and run on [pangeo.pydata.org](http://pangeo.pydata.org) or your own Kubernetes cluster. Pangeo is an awesome project we are involved in to provide a scalable scientific analysis environment!"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Installing\n_This notebook assumes you are using Pangeo or you have [Kubernetes authentication](https://kubernetes.io/docs/admin/accessing-the-api/) set up, either with config in a `.kube` directory or via a service account._\n\nFirst we need to install Daskernetes.\n\n_If you're using Pangeo then you can skip this step as it is already available out of the box._"
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "!pip install daskernetes --user",
"execution_count": null,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Create your cluster\nWe need to import some dependancies. We'll import `os` so that we can access some environment variables later. Then we'll import `dask` and `distrubuted` and then finally our `KubeCluster` constructor from `daskernetes.core`.\n\nWe can now go ahead and create a cluster. Creating this object will start a Dask distributed scheduler within your notebook session. You can also specify details such as a name to prepend your worker container names with.\n\nWe'll set the number of workers to one for now but we can use the HTML widget to update this later. Finally we'll set some boiler plate details such as the ip and port to run the service on and the number of Python threads to start in each worker."
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "import os\nimport dask\nimport distributed\nimport daskernetes\n\ncluster = daskernetes.KubeCluster()",
"execution_count": 1,
"outputs": []
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "cluster",
"execution_count": null,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "You can access the Dask dashboard using the address information printed in the table, on Pangeo you can simply click the link!"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Create a client for your cluster\nNow that we have a cluster we can interact with it the usual Dask way. This is done by creating a dask distributed client, the cluster already contains the address information of the scheduler so we can pass that property into the client contructor."
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "client = distributed.Client(cluster.scheduler_address)\nclient",
"execution_count": null,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "You can now use the client within your dask distributed workflow and scale the number of workers dynamically using the widget above."
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Example distributed task\nLet's take our cluster for a test drive. We'll create a very simple lambda function which takes a number and cubes it, then we'll use dask to map that function over every number between 0 and 999 in a distributed fashion. Then we'll use distributed to sum the results and print the answer back into our notebook.\n\nFeel free to scale your cluster using the widget above and increase the range and see how quickly you can get it to run this over very large arrays of numbers."
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "cube = lambda x: x**3\n\ndata = client.scatter(range(100000))\n\ncubed_numbers = client.map(cube, data)\n\ntotal = client.submit(sum, cubed_numbers)\n\nprint(total.result())",
"execution_count": null,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Further topics"
},
{
"metadata": {},
"cell_type": "markdown",
"source": "### Adaptive\nYou can switch you cluster to an adaptive strategy by calling the following method on your cluster. This results in the cluster not paying attention to the requested number of workers either from the cluster constructor or the widget above. Instead it will scale the workers dynamically as tasks are submitted to the scheduler and completed."
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "cluster.adapt()",
"execution_count": null,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "### Cleaning up\nWhen you're done don't forget to close your cluster. Currently failing to call this will leave worker containers floating around in kubernetes after you close your notebook."
},
{
"metadata": {
"scrolled": false,
"trusted": true
},
"cell_type": "code",
"source": "cluster.close()",
"execution_count": null,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "### Scaling down\nIf you are affected by issue [#27](https://github.com/dask/daskernetes/issues/27) and cannot get your workers to scale down you can clean them up in the following way."
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "from daskernetes.core import cleanup_pods\n\ncleanup_pods(cluster.namespace, cluster.worker_labels)",
"execution_count": null,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "### Context managers\nYou can use daskernetes as a [context manager](https://docs.python.org/3/library/contextlib.html) which will create a cluster for the duration of some specific work. Let's run the example above but within the context of a cluster. This means a cluster will be created when the `with` statement is entered and it will be destroyed when the `with` statement exits. It avoids having to remember to close your cluster when you're done!\n\n_Be sure to close your cluster first if you created one above as you can only have one cluster in existance in your notebook at a time._"
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "with KubeCluster() as cluster:\n client = distributed.Client(cluster.scheduler_address)\n \n cube = lambda x: x**3\n data = client.scatter(range(100000))\n cubed_numbers = client.map(cube, data)\n total = client.submit(sum, cubed_numbers)\n print(total.result())",
"execution_count": null,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "### Logs\nWhen debugging distributed applications it can be frustrating seeing what went wrong as errors and exceptions happen on the workers and don't always bubble back up to your notebook. You can retrieve the logs from your workers by getting the identity of the worker pod and then calling `cluster.logs()`.\n\nHere's an example of getting the logs for the first worker pod in your cluster."
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "pod = cluster.pods()[0]\ncluster.logs(pod)",
"execution_count": null,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "## Conclusion\n\nWhile daskernetes and pangeo are still in early phases we already think this is going to be a vital tool in processing and analysing the vast quantities of data we are producing."
}
],
"metadata": {
"_draft": {
"nbviewer_url": "https://gist.github.com/f4365393463fc3247f494673de110263"
},
"gist": {
"id": "f4365393463fc3247f494673de110263",
"data": {
"description": "Starting with daskernetes.ipynb",
"public": true
}
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3",
"language": "python"
},
"language_info": {
"name": "python",
"version": "3.6.3",
"mimetype": "text/x-python",
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"pygments_lexer": "ipython3",
"nbconvert_exporter": "python",
"file_extension": ".py"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment