Skip to content

Instantly share code, notes, and snippets.

@ryan-williams
Last active April 1, 2020 05:54
Show Gist options
  • Save ryan-williams/5304c3c400a1eeb5995bf2d85188d1d9 to your computer and use it in GitHub Desktop.
Save ryan-williams/5304c3c400a1eeb5995bf2d85188d1d9 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Is [Dask `Delayed`](https://docs.dask.org/en/latest/delayed.html) a Monad?\n",
"[Dask](https://dask.org/) is a very cool Python library, and one of its core abstractions is the \"[Delayed](https://docs.dask.org/en/latest/delayed.html)\" container.\n",
"\n",
"`delayed`:\n",
"- wraps values\n",
"- records subsequent operations on these wrapped values (without performing the operations)\n",
"- lets you think you are working with raw values\n",
"\n",
"Meanwhile, it builds a graph of your program, providing opportunities to [visualize](https://docs.dask.org/en/latest/graphviz.html), [optimize](https://docs.dask.org/en/latest/optimize.html), and [parallelize](https://distributed.dask.org/en/latest/) your computations.\n",
"\n",
"## Contexualizing Dask Delayed\n",
"Delayed clearly looks like abstractions found in other contexts; it provides \"laziness\" and \"provenance\", and I've guessed that it's a [Monad](https://en.wikipedia.org/wiki/Monad_(functional_programming) (though the API doesn't make that obvious).\n",
"\n",
"I recently needed to do [some relatively intricate Dask-graph manipulations](https://nbviewer.jupyter.org/gist/ryan-williams/0aeb2111cb8b7fd588784e2a7c35433e/chain-example.ipynb), and had trouble ekeing simple monadic transformations out of Dask's APIs, so here I'll compare [`Delayed`](https://docs.dask.org/en/latest/delayed.html) to a few relevant to [category-theoretic](https://en.wikipedia.org/wiki/Category_theory) concepts (as implemented in [Scala's \"Cats\" library](https://typelevel.org/cats)):\n",
"- [**`Delayed` is an Applicative**](#applicative)\n",
" - [`pure`](#pure)\n",
" - [`map`](#map)\n",
" - [`ap`](#ap)\n",
"- [**`Delayed is a Monad**](#monad)\n",
" - [`flatten`](#flatten)\n",
" - [`flatMap`](#flatMap)\n",
"- [**Discussion**](#discussion)\n",
" - [Sequencing computations](#sequencing)\n",
" - [Example: task-chaining](#chaining)\n",
" - [Dynamic task graphs](#dynamic-graphs)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from dask import delayed"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"--------\n",
"## Delayed is an [Applicative](https://typelevel.org/cats/typeclasses/applicative.html#applicative) <a id=\"applicative\"></a>\n",
"[Applicatives](https://typelevel.org/cats/typeclasses/applicative.html#applicative) support 3 operations:\n",
"\n",
"| Name | Type | Notes |\n",
"| ---- | ---- | ----- |\n",
"| `pure` | `A ⇒ F[A]` | Lift any value into the `Delayed` container |\n",
"| `ap` | `(F[A], F[A ⇒ B]) ⇒ F[B]` | Apply a `Delayed` function to a `Delayed` value and obtain a `Delayed` result |\n",
"| `map` | `(F[A], A ⇒ B) ⇒ F[B]` | Apply a function to a `Delayed` value and obtain a `Delayed` result |\n",
"\n",
"See examples below:\n",
"### `pure` <a id=\"pure\"></a>\n",
"`A ⇒ F[A]`; lift any value into the `Delayed` container:"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Delayed('int-fdb51b88-bca5-44de-b591-6d98f19b6497')"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"a = 4\n",
"fa = delayed(a); fa"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\"Unboxing\", just to check:"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"fa.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A \"generic\" implementation is trivial:"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def pure(a): return delayed(a)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As we see, `delayed` itself is the simplest generic implementation of the `pure` capability:"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"pure = delayed"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### `ap` <a id=\"ap\"></a>\n",
"`(F[A], F[A ⇒ B]) ⇒ F[B]`; apply a `Delayed` function to a `Delayed` value and obtain a `Delayed` result"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Delayed('hex-38be2eca-d878-44fd-bc9e-7749e355a8a5')"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"_hex = delayed(hex) # F[int ⇒ str]: a delayed function from an int to a str\n",
"fb = _hex(fa); fb"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'0x4'"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"fb.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We passed a `Delayed` value to a `Delayed` function and the two levels of \"delay\" were \"flattened\" into the usual/desired one level, so that a single `.compute()` returns the value.\n",
"\n",
"🆒😎\n",
"\n",
"The generic implementation is again trivial:"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"ap = lambda fa, f_a_b: f_a_b(fa)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### `map`\n",
"`(F[A], A ⇒ B) ⇒ F[B]`; apply a function to a `Delayed` value and obtain a `Delayed` result\n",
"\n",
"This is the defining capability of a [Functor](https://typelevel.org/cats/typeclasses/functor.html), which [Applicatives](https://typelevel.org/cats/typeclasses/applicative.html#applicative) implement, and is conceptually simpler than [`ap`](#ap).\n",
"\n",
"However, it's not obvious how to do in Dask, at least directly. Dask really wants to work with `Delayed` functions, not feed `Delayed` values directly to eager functions.\n",
"\n",
"Luckily, [`pure`](#pure) and [`ap`](#ap) are sufficient to implement `map` (which also explains why [`Applicative`s](https://typelevel.org/cats/typeclasses/applicative.html#applicative) are [`Functor`s](https://typelevel.org/cats/typeclasses/functor.html)):"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"map = lambda fa, a_b: delayed(a_b)(fa)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Example:"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Delayed('hex-3d16e63e-8790-4680-9c71-be3bf71ac3dc')"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"fb = map(fa, hex); fb"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'0x4'"
]
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"fb.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here we turned an `F[int]` and an `int ⇒ str` into an `F[str]` 👍"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"-------\n",
"## Is `Delayed` a [Monad](https://typelevel.org/cats/typeclasses/monad.html)? <a id=\"monad\"></a>\n",
"Great, `Delayed` is an [Applicative](https://typelevel.org/cats/typeclasses/applicative.html#applicative) (and a [Functor](https://typelevel.org/cats/typeclasses/functor.html)); but is it a [Monad](https://typelevel.org/cats/typeclasses/monad.html)?\n",
"\n",
"According to [Cats' Monad docs](https://typelevel.org/cats/typeclasses/monad.html#monad):\n",
"> `Monad` extends the `Applicative` type class with a new function `flatten`. Flatten takes a value in a nested context (eg. `F[F[A]]` where F is the context) and “joins” the contexts together so that we have a single context (ie. `F[A]`).\n",
"\n",
"So all we need to prove `Delayed`'s monadicity is `flatten`!\n",
"\n",
"### `flatten` <a id=\"flatten\"></a>\n",
"`F[F[A]] ⇒ F[A]`: \"join\" two levels of `Delayed` into one\n",
"\n",
"(Note: this operation is commonly called \"join\" as well)\n",
"\n",
"Naively, we can always remove a layer of `F[_]` wrapping by calling `.compute()` (which has type `F[A] ⇒ [A]`):"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"fa.compute() # F[int] ⇒ int"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In principle flattening is as simple as:"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"def flatten(fa): return fa.compute() # F[A] ⇒ A"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We know this works for `F[A] ⇒ A`, but what about `F[F[A]] ⇒ F[A]`?\n",
"\n",
"Substituting \"`F[A]`\" for \"`A`\" in the type `F[A] ⇒ A` yields `F[F[A]] ⇒ F[A]`, indicating that it *should* work, but let's try to construct an `F[F[A]]` to test it:"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Delayed('int-fdb51b88-bca5-44de-b591-6d98f19b6497')"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ffa = delayed(fa); ffa"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"It turns out that `delayed` is clever, and has short-circuited the double-wrap here. A single `.compute()` gives us our value back:"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ffa.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"However, we can still pull one over on `delayed`:"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Delayed('lambda-3e297b56-1348-49ca-b3a2-7ad5a0b9a7d5')"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ffa = delayed(lambda a: delayed(a))(a); ffa"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"`ffa` is truly of type `F[F[A]]`; calling `.compute()` gives us an `F[A]` (still Delayed!):"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Delayed('int-174298a5-f846-4c8d-8b51-166eff6c0dd4')"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ffa.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Calling `.compute()` twice yields our original value again:"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"4"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ffa.compute().compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Clearly, `flatten` (a.k.a. `compute`) is possible for Delayed objects! So…\n",
"\n",
"#### 🎉🎉 `Delayed` is a monad! 🎉🎉"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### `flatMap` <a id=\"flatMap\"></a>\n",
"`(F[A], A ⇒ F[B]) ⇒ F[B]`\n",
"\n",
"For completeness, let's discuss `flatMap`.\n",
"\n",
"#### intro\n",
"`flatMap` is typically the core behavior of a Monad:\n",
"- a `Delayed` value (`F[A]`) is fed to a function\n",
"- that function, roughly:\n",
" - enters the `F` \"box\" where the actual `A` value is\n",
" - transforms the `A` to a `B` *while introducing a second level of delay/wrapping* (`A ⇒ F[B]`)\n",
" - leaves the box (meaning now we expect to be doubly-nested in two layers of `F[_]`, with an `F[F[B]]`)!.\n",
"\n",
"#### magic\n",
"Yet, somehow, the magic of `flatMap` yields a transformed – but only singly-wrapped – value (`F[B]`).\n",
"\n",
"This \"magic\" – the specifics of *how* a monad takes something doubly-wrapped, and unwraps a layer (or, more commonly, fakes it and lets you carry on as if it has) – is what defines the big, famous, common monads:\n",
"\n",
"- Maybe (a.k.a. Option)\n",
"- Future (a.k.a. Promise; see also async/await)\n",
"- List, Set\n",
"- Try (like Option, but stores+propagates an error)\n",
"- IO, Lazy (basically what Dask Delayed does)\n",
"\n",
"#### deriving from `map`, `flatten`\n",
"With all that said, `flatMap` is trivially implementable in terms of `map` and `flatten`:"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"flatMap = lambda fa, a_fb: flatten(map(fa, a_fb))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Testing:"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'0x4'"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"int_to_delayed_str = lambda a: delayed(hex(a))\n",
"\n",
"flatMap(fa, int_to_delayed_str).compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Hooray!"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Discussion <a id=\"discussion\"></a>\n",
"\n",
"### Sequencing computations <a id=\"sequencing\"></a>\n",
"A defining characteristic of Monads is that they support *sequencing* computations (i.e. running things in serial, one after another).\n",
"\n",
"That's ironic given Dask's focus on parallel computation, but nevertheless frequently useful.\n",
"\n",
"The intuition for why that is can be found in the signature of [`flatMap`](#flatMap):\n",
"```\n",
"(F[A], A ⇒ F[B]) ⇒ F[B]\n",
"```\n",
"You start with one computation (`F[A]`), and you end with another (`F[B]`); could they both be running concurrently at some point?\n",
"\n",
"The answer is no: the second computation, `F[B]`, is never created (or introduced into the task graph) until the function `A ⇒ F[B]` has completed, and the `A` from the original `F[A]` must be available, and unboxed, before `A ⇒ F[B]` can happen!\n",
"\n",
"#### Case study: \"chaining\" tasks <a id=\"chaining\"></a>\n",
"Let's leverage this \"sequencing\" property to construct a function that, given two tasks `F[A]` and `F[B]`, will run `F[A]` to completion, then `F[B]`, and return both values (`(F[A], F[B]) ⇒ (A, B)`):"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"from time import sleep\n",
"def id(x):\n",
" '''Simple function that returns the value it's passed\n",
" \n",
" It also prints its progress, and sleeps on a certain value\n",
" (which we'll pass first, to verify tasks aren't running\n",
" concurrently)'''\n",
"\n",
" print(f'computing: {x}')\n",
" if x == 'A':\n",
" print(f'Sleeping…')\n",
" sleep(1)\n",
" print(f'computed: {x}')\n",
" return x\n",
"\n",
"id = delayed(id)\n",
"\n",
"# Sample \"delayed\" values\n",
"fa = id('A', dask_key_name='idA')\n",
"fb = id('B', dask_key_name='idB')"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"def sequence(fa, fb):\n",
" # Helper which receives a computed `a`, and in turn computes `b`\n",
" _sequence = lambda a: (a, fb.compute())\n",
"\n",
" name = f\"combine_{fb._key}\"\n",
" return delayed(_sequence)(fa, dask_key_name=name)"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"Delayed('combine_idB')"
]
},
"execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"res = sequence(fa, fb); res"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"computing: A\n",
"Sleeping…\n",
"computed: A\n",
"computing: B\n",
"computed: B\n"
]
},
{
"data": {
"text/plain": [
"('A', 'B')"
]
},
"execution_count": 24,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"res.compute()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We can see the task graph shows `A` being computed, and then combined with `B`:"
]
},
{
"cell_type": "code",
"execution_count": 25,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<IPython.core.display.Image object>"
]
},
"execution_count": 25,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"res.visualize()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"(the original `B` node does not appear on its own in Dask's view of this computation; it is \"side-loaded\" by the `combine_idB` node, but that is OK!)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Dynamic Task Graphs <a id=\"dynamic-graphs\"></a>\n",
"Another defining characteristic of \"task Monads\" is that they allow for \"dynamic dependency graphs\", where the information about dependencies between nodes (and even what nodes exist!) is only available \"at run-time\" (as other nodes are executed.\n",
"\n",
"Note that this doesn't mean all nodes' in a given task graph leverage this dynamicism, simply that it is possible.\n",
"\n",
"For example, imagine a workflow that involves performing some computation that outputs a number, and performing some other step that number of times (or using that many partitions); the latter task nodes are only created when execution of their ancestors has completed!\n",
"\n",
"The implications of such dynamicism is examined in the delightful [\"Build Systems a la Carte\" (Mokhov 2019)](https://www.microsoft.com/en-us/research/uploads/prod/2018/03/build-systems-final.pdf), which is well worth a read.\n",
"\n",
"This means that Dask task graphs can be entirely dynamic; does this occur in the wild? It seems doubtful, but certainly something that may be worth making more explicit + streamlined.\n",
"\n",
"Thanks for reading!"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "dask-3.8.1",
"language": "python",
"name": "dask-3.8.1"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.1"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment