Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created November 22, 2016 22:47
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save mrocklin/ada85ef06d625947f7b34886fd2710f8 to your computer and use it in GitHub Desktop.
Save mrocklin/ada85ef06d625947f7b34886fd2710f8 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\" \n",
" width=\"30%\" \n",
" align=right\n",
" alt=\"Dask logo\">\n",
"\n",
"Custom Workflows\n",
"------------------\n",
"\n",
"We submit tasks directly to the task scheduler. This demonstrates the flexibility that can be achieved with the `submit` function and normal Python for loops.\n",
"\n",
"Later on we map functions across Python queues to construct data processing pipelines."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from dask.distributed import Client, progress\n",
"client = Client('localhost:8786')\n",
"client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from time import sleep\n",
"\n",
"def inc(x):\n",
" from random import random\n",
" sleep(random())\n",
" return x + 1\n",
"\n",
"def double(x):\n",
" from random import random\n",
" sleep(random())\n",
" return 2 * x\n",
" \n",
"def add(x, y):\n",
" from random import random\n",
" sleep(random())\n",
" return x + y "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"inc(1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"future = client.submit(inc, 1) # returns immediately with pending future\n",
"future"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"future # scheduler and client talk constantly"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"future.result()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit many tasks\n",
"\n",
"We submit many tasks that depend on each other in a normal Python for loop"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%time\n",
"zs = []\n",
"for i in range(256):\n",
" x = client.submit(inc, i) # x = inc(i)\n",
" y = client.submit(double, x) # y = inc(x)\n",
" z = client.submit(add, x, y) # z = inc(y)\n",
" zs.append(z)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"client.gather(zs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Custom computation: Tree summation\n",
"\n",
"As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for loop and a bit of normal Python logic.\n",
"\n",
"```\n",
"finish total single output\n",
" ^ / \\\n",
" | c1 c2 neighbors merge\n",
" | / \\ / \\\n",
" | b1 b2 b3 b4 neighbors merge\n",
" ^ / \\ / \\ / \\ / \\\n",
"start a1 a2 a3 a4 a5 a6 a7 a8 many inputs\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"L = zs\n",
"while len(L) > 1:\n",
" new_L = []\n",
" for i in range(0, len(L), 2):\n",
" future = client.submit(add, L[i], L[i + 1]) # add neighbors\n",
" new_L.append(future)\n",
" L = new_L # swap old list for new\n",
" \n",
"progress(L)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"L"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"client.gather(L)"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"Example with data streams\n",
"----------------------------\n",
"\n",
"The executor can map functions over lists or queues. This is nothing more than calling `submit` many times. We can chain maps on queues together to construct simple data processing pipelines.\n",
"\n",
"All of this logic happens on the client-side. None of this logic was hard-coded into the scheduler. This simple streaming system is a good example of the kind of system that becomes easy for users to build when given access to custom task scheduling."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from queue import Queue\n",
"from threading import Thread\n",
"\n",
"def multiplex(n, q, **kwargs):\n",
" \"\"\" Convert one queue into several equivalent Queues\n",
" \n",
" >>> q1, q2, q3 = multiplex(3, in_q)\n",
" \"\"\"\n",
" out_queues = [Queue(**kwargs) for i in range(n)]\n",
" def f():\n",
" while True:\n",
" x = q.get()\n",
" for out_q in out_queues:\n",
" out_q.put(x)\n",
" t = Thread(target=f)\n",
" t.daemon = True\n",
" t.start()\n",
" return out_queues "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```\n",
" ----inc---->\n",
" / \\ \n",
"in_q --> q \\_add__ results\n",
" \\ / \n",
" ---double-->/\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"in_q = Queue()\n",
"q = client.scatter(in_q)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"in_q.put(1)\n",
"q.get()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"q_1, q_2 = multiplex(2, q)\n",
"\n",
"inc_q = client.map(inc, q_1)\n",
"double_q = client.map(double, q_2)\n",
"\n",
"add_q = client.map(add, inc_q, double_q)\n",
"\n",
"out_q = client.gather(add_q)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"in_q.put(10)\n",
"out_q.get()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from random import random\n",
"\n",
"def feed(q):\n",
" for i in range(10000):\n",
" sleep(random())\n",
" q.put(i)\n",
" \n",
"t = Thread(target=feed, args=(q,))\n",
"t.daemon = True\n",
"t.start()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"out_q.qsize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
},
"widgets": {
"state": {
"b112c72cc59a4cdc8217351b61931334": {
"views": [
{
"cell_index": 11
}
]
}
},
"version": "1.2.0"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment