Skip to content

Instantly share code, notes, and snippets.

@mrocklin
Created June 20, 2016 20:03
Show Gist options
  • Save mrocklin/ef9ccd29a6ec5f4de84d6192be95042a to your computer and use it in GitHub Desktop.
Save mrocklin/ef9ccd29a6ec5f4de84d6192be95042a to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\" \n",
" width=\"30%\" \n",
" align=right\n",
" alt=\"Dask logo\">\n",
"\n",
"Custom Workflows\n",
"------------------\n",
"\n",
"We submit tasks directly to the task scheduler. This demonstrates the flexibility that can be achieved with the `submit` function and normal Python for loops.\n",
"\n",
"Later on we map functions across Python queues to construct data processing pipelines."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from dask.distributed import Executor, progress\n",
"e = Executor('localhost:8786')\n",
"e"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"from time import sleep\n",
"\n",
"def inc(x):\n",
" from random import random\n",
" sleep(random())\n",
" return x + 1\n",
"\n",
"def double(x):\n",
" from random import random\n",
" sleep(random())\n",
" return 2 * x\n",
" \n",
"def add(x, y):\n",
" from random import random\n",
" sleep(random())\n",
" return x + y "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"inc(1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"future = e.submit(inc, 1) # returns immediately with pending future\n",
"future"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"future # scheduler and client talk constantly"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"future.result()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Submit many tasks\n",
"\n",
"We submit many tasks that depend on each other in a normal Python for loop"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"%%time\n",
"zs = []\n",
"for i in range(16):\n",
" x = e.submit(inc, i) # x = inc(i)\n",
" y = e.submit(double, x) # y = inc(x)\n",
" z = e.submit(add, x, y) # z = inc(y)\n",
" zs.append(z)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"e.gather(zs)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Custom computation: Tree summation\n",
"\n",
"As an example of a non-trivial algorithm, consider the classic tree reduction. We accomplish this with a nested for loop and a bit of normal Python logic.\n",
"\n",
"```\n",
"finish total single output\n",
" ^ / \\\n",
" | c1 c2 neighbors merge\n",
" | / \\ / \\\n",
" | b1 b2 b3 b4 neighbors merge\n",
" ^ / \\ / \\ / \\ / \\\n",
"start a1 a2 a3 a4 a5 a6 a7 a8 many inputs\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"L = zs\n",
"while len(L) > 1:\n",
" new_L = []\n",
" for i in range(0, len(L), 2):\n",
" future = e.submit(add, L[i], L[i + 1]) # add neighbors\n",
" new_L.append(future)\n",
" L = new_L # swap old list for new\n",
" \n",
"progress(L)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"L"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"e.gather(L)"
]
},
{
"cell_type": "markdown",
"metadata": {
"collapsed": true
},
"source": [
"Example with data streams\n",
"----------------------------\n",
"\n",
"The executor can map functions over lists or queues. This is nothing more than calling `submit` many times. We can chain maps on queues together to construct simple data processing pipelines.\n",
"\n",
"All of this logic happens on the client-side. None of this logic was hard-coded into the scheduler. This simple streaming system is a good example of the kind of system that becomes easy for users to build when given access to custom task scheduling."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from queue import Queue\n",
"from threading import Thread\n",
"\n",
"def multiplex(n, q, **kwargs):\n",
" \"\"\" Convert one queue into several equivalent Queues\n",
" \n",
" >>> q1, q2, q3 = multiplex(3, in_q)\n",
" \"\"\"\n",
" out_queues = [Queue(**kwargs) for i in range(n)]\n",
" def f():\n",
" while True:\n",
" x = q.get()\n",
" for out_q in out_queues:\n",
" out_q.put(x)\n",
" t = Thread(target=f)\n",
" t.daemon = True\n",
" t.start()\n",
" return out_queues "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```\n",
" ----inc---->\n",
" / \\ \n",
"in_q --> q \\_add__ results\n",
" \\ / \n",
" ---double-->/\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"in_q = Queue()\n",
"q = e.scatter(in_q)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"in_q.put(1)\n",
"q.get()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"q_1, q_2 = multiplex(2, q)\n",
"\n",
"inc_q = e.map(inc, q_1)\n",
"double_q = e.map(double, q_2)\n",
"\n",
"add_q = e.map(add, inc_q, double_q)\n",
"\n",
"out_q = e.gather(add_q)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"in_q.put(10)\n",
"out_q.get()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from random import random\n",
"\n",
"def feed(q):\n",
" for i in range(10000):\n",
" sleep(random())\n",
" q.put(i)\n",
" \n",
"t = Thread(target=feed, args=(q,))\n",
"t.daemon = True\n",
"t.start()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"out_q.qsize()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.1"
}
},
"nbformat": 4,
"nbformat_minor": 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment