Created
March 7, 2024 09:11
-
-
Save minrk/840ca3ae4cc39dd881f26e44bc814ab5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"id": "ec36ea2a-919b-4eb5-80af-bb53a467a87f", | |
"metadata": {}, | |
"source": [ | |
"# Map-style computation via SPMD using IPython Parallel Broadcast View\n", | |
"\n", | |
"In order to use BroadcastView efficiently, tasks must be able to be expressed as a single `apply` (or `execute`) call.\n", | |
"This is best done through [SPMD-style](https://en.wikipedia.org/wiki/Single_program,_multiple_data) tasks.\n", | |
"\n", | |
"If you've written code for MPI, this pattern ought to be familiar.\n", | |
"\n", | |
"The main thing that differs SPMD tasks is that each engine gets the same _code_ to execute,\n", | |
"but the results of what the execute depend on the _state_ of the process.\n", | |
"Typically the engine's _rank_ in the cluster and the cluster _size_, though it can get more complex.\n", | |
"\n", | |
"So rather than calling `map(func, inputs)`, you call `apply(map_func)`, where `map_func` computes a _partition_ and calls the map.\n", | |
"Computing the partition becomes part of the task itself." | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "09ead12d-1407-4f95-ae70-02788b941b29", | |
"metadata": {}, | |
"source": [ | |
"So the first thing we need is a function to compute the partition, given inputs, rank, and size.\n", | |
"Here is a very simple example partitioning function for one-dimensional sequences (e.g. lists)" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"id": "174f49d1-8ca2-4141-a174-ea6752f9c52f", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def get_partition(n_items: int, rank: int, size: int)-> tuple[int, int]: \n", | |
" \"\"\"\n", | |
" Compute the partition\n", | |
"\n", | |
" Returns (start, end) of partition\n", | |
" \"\"\"\n", | |
" chunk_size = n_items // size\n", | |
" if n_items % size:\n", | |
" chunk_size += 1\n", | |
" start = rank * chunk_size\n", | |
" if rank + 1 == size:\n", | |
" end = n_items\n", | |
" else:\n", | |
" end = start + chunk_size\n", | |
" return (start, end)\n" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"id": "21fe2d6a-7d8f-45e5-acc8-83136a5c8598", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"3 0 (0, 4)\n", | |
"3 1 (4, 8)\n", | |
"3 2 (8, 10)\n", | |
"4 0 (0, 3)\n", | |
"4 1 (3, 6)\n", | |
"4 2 (6, 9)\n", | |
"4 3 (9, 10)\n", | |
"5 0 (0, 2)\n", | |
"5 1 (2, 4)\n", | |
"5 2 (4, 6)\n", | |
"5 3 (6, 8)\n", | |
"5 4 (8, 10)\n" | |
] | |
} | |
], | |
"source": [ | |
"n = 10\n", | |
"for size in (3, 4, 5):\n", | |
" for rank in range(size):\n", | |
" print(size, rank, get_partition(n, rank, size))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "fc58de7b-5d9b-47c5-aa1d-81221b6aad42", | |
"metadata": {}, | |
"source": [ | |
"Now set up our fake workload.\n", | |
"\n", | |
"It is a bunch of random files.\n", | |
"For our purposes, 5 files per engine." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"id": "6434f095-aaf7-4944-aa85-86ba5208fdee", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import tempfile\n", | |
"from pathlib import Path\n", | |
"tmp_dir = tempfile.TemporaryDirectory()\n", | |
"tmp_path = Path(tmp_dir.name)\n", | |
"\n", | |
"n_engines = 100\n", | |
"tasks_per_engine = 5\n", | |
"n_items = n_engines * tasks_per_engine\n", | |
"\n", | |
"for i in range(n_items):\n", | |
" with (tmp_path / f\"file-{i:03}.txt\").open(\"wb\") as f:\n", | |
" f.write(os.urandom(1024))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"id": "4e095ce7-7ba7-41db-b0fa-fff13fd96ddf", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"500" | |
] | |
}, | |
"execution_count": 4, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"input_files = list(tmp_path.glob(\"*.txt\"))\n", | |
"len(input_files)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "88e1aa09-9bf5-4db5-ad38-c5b238355930", | |
"metadata": {}, | |
"source": [ | |
"Here's our task: compute the md5sum of the contents of one file" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"id": "5720da2a-8ff1-440c-92ef-505f86b18b84", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from hashlib import md5\n", | |
"\n", | |
"def compute_one(fname):\n", | |
" hasher = md5()\n", | |
" with open(fname, \"rb\") as f:\n", | |
" hasher.update(f.read())\n", | |
" return hasher.hexdigest()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"id": "9d6c99cf-8f15-49e2-ab68-e4896c2b9563", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"'73dd81c88fe62de4e7ab710aa04861a9'" | |
] | |
}, | |
"execution_count": 6, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"compute_one(input_files[0])" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"id": "334f5c89-41fb-44ab-8b5b-0c96e5e12624", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"CPU times: user 4.62 ms, sys: 7.45 ms, total: 12.1 ms\n", | |
"Wall time: 11.6 ms\n" | |
] | |
} | |
], | |
"source": [ | |
"%time local_result = list(map(compute_one, input_files))" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "1896df6d-e965-4336-b133-9c63ef7bbabd", | |
"metadata": {}, | |
"source": [ | |
"Now we need to define a task that takes as input:\n", | |
"\n", | |
"- tmp_path (same everywhere)\n", | |
"- rank (unique per engine)\n", | |
"- size (same everywhere)\n", | |
"\n", | |
"which will compute the same thing as computing a chunk of `map(compute_one, input_files)`" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"id": "88c82b8f-b39f-437f-bafe-39ff351828db", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"def spmd_task(tmp_path, rank, size):\n", | |
" # identify all inputs\n", | |
" all_input_files = list(Path(tmp_path).glob(\"*.txt\"))\n", | |
" # partition inputs\n", | |
" n_items = len(all_input_files)\n", | |
" start, end = get_partition(n_items, rank, size)\n", | |
" my_input_files = all_input_files[start:end]\n", | |
" # compute result\n", | |
" return list(map(compute_one, my_input_files))\n", | |
" " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"id": "02b4e24d-6bf8-48a3-9775-74cf33eeac2b", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"['73dd81c88fe62de4e7ab710aa04861a9', '162bcd77dff3ff2fefebdb951d6b87cf']\n", | |
"['588950f656a840423c936281eca794b5', '2fc1db292ee57bc003f5abf311a487f8']\n", | |
"['4a0e249263a6695ec780881634dccfd7', '3f1594cfb346502818c98bd22e14f783']\n", | |
"['54c61fc576a7e8ca7d0af4d14257d7a5', '1e8a9deda60a1a7218469fb67ba041fd']\n", | |
"['6b4a782e39c61ee9125dbf831718fea8', '3ff2c6e4c276cc8f9927e5a457d4872a']\n" | |
] | |
} | |
], | |
"source": [ | |
"for rank in range(5):\n", | |
" print(spmd_task(tmp_path, rank, n_items // 2))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"id": "bf01e25a-4747-45a6-897e-3eb2423518f1", | |
"metadata": { | |
"scrolled": true | |
}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"['73dd81c88fe62de4e7ab710aa04861a9',\n", | |
" '162bcd77dff3ff2fefebdb951d6b87cf',\n", | |
" '588950f656a840423c936281eca794b5',\n", | |
" '2fc1db292ee57bc003f5abf311a487f8',\n", | |
" '4a0e249263a6695ec780881634dccfd7',\n", | |
" '3f1594cfb346502818c98bd22e14f783',\n", | |
" '54c61fc576a7e8ca7d0af4d14257d7a5',\n", | |
" '1e8a9deda60a1a7218469fb67ba041fd',\n", | |
" '6b4a782e39c61ee9125dbf831718fea8',\n", | |
" '3ff2c6e4c276cc8f9927e5a457d4872a']" | |
] | |
}, | |
"execution_count": 10, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"local_result[:10]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "dcfeaf1f-5138-4ce6-8a67-2ed030d7dd08", | |
"metadata": {}, | |
"source": [ | |
"Now it's time to do it in parallel" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"id": "5b3b3e25-cb59-4d5a-8296-c98aec512ddf", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"application/vnd.jupyter.widget-view+json": { | |
"model_id": "6c86b71fac9949749044fe193b039357", | |
"version_major": 2, | |
"version_minor": 0 | |
}, | |
"text/plain": [ | |
" 67%|######7 | 67/100 [00:00<?, ?engine/s]" | |
] | |
}, | |
"metadata": {}, | |
"output_type": "display_data" | |
} | |
], | |
"source": [ | |
"import logging\n", | |
"import ipyparallel as ipp\n", | |
"\n", | |
"try:\n", | |
" # stop previous cluster, if we're re-running cells\n", | |
" cluster.stop_cluster_sync()\n", | |
"except NameError:\n", | |
" pass\n", | |
"\n", | |
"cluster = ipp.Cluster(n=n_engines, log_level=logging.WARNING)\n", | |
"rc = cluster.start_and_connect_sync()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"id": "0be69cf6-9c19-4d9f-b825-369b21d0e0bf", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"broadcast_view = rc.broadcast_view()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "8eb7b765-7fe9-40d9-805d-2b9331b7edf1", | |
"metadata": {}, | |
"source": [ | |
"Distribute rank and size.\n", | |
"This is unnecessary if engines are created with MPI." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"id": "a28b6688-0bd0-4aa3-b773-5225d45c7430", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"broadcast_view.scatter(\"rank\", rc.ids, flatten=True)\n", | |
"broadcast_view[\"size\"] = size = len(rc)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "bf5ee967-2182-4319-b787-c120dc07f874", | |
"metadata": {}, | |
"source": [ | |
"enable cloudpickle, which handles imports;\n", | |
"we could also explicitly push everything we are going to use." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"id": "422208f1-01ef-41f9-a1e7-d10379c0fbde", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"broadcast_view.use_cloudpickle().get();" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "081882db-5c5a-4137-9fba-5cf6deb83168", | |
"metadata": {}, | |
"source": [ | |
"We can now send this SPMD task as a _single_ task on all engines,\n", | |
"each of which will compute its own partition as part of the task and do its work:" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"id": "0b563c53-2ba5-4fab-b0a3-65953acd9341", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"ar = broadcast_view.apply(spmd_task, tmp_path, ipp.Reference(\"rank\"), size)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"id": "a8d74569-017d-4e3f-b32a-c5b0f288e4d6", | |
"metadata": {}, | |
"source": [ | |
"Finally, we can reconstruct the result:\n", | |
"\n", | |
"Because we called `apply`, the result is a list of lists, when we want a single flat sequence.\n", | |
"`itertools.chain` takes care of that!" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"id": "22f24204-b7b7-4586-b927-5ac0e21d13c3", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"[['73dd81c88fe62de4e7ab710aa04861a9',\n", | |
" '162bcd77dff3ff2fefebdb951d6b87cf',\n", | |
" '588950f656a840423c936281eca794b5',\n", | |
" '2fc1db292ee57bc003f5abf311a487f8',\n", | |
" '4a0e249263a6695ec780881634dccfd7'],\n", | |
" ['3f1594cfb346502818c98bd22e14f783',\n", | |
" '54c61fc576a7e8ca7d0af4d14257d7a5',\n", | |
" '1e8a9deda60a1a7218469fb67ba041fd',\n", | |
" '6b4a782e39c61ee9125dbf831718fea8',\n", | |
" '3ff2c6e4c276cc8f9927e5a457d4872a']]" | |
] | |
}, | |
"execution_count": 16, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"ar.get()[:2]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"id": "e49aee9e-79e1-47d3-b5f8-5f6eedb8fa61", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from itertools import chain\n", | |
"\n", | |
"parallel_result = list(chain(*ar))" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 18, | |
"id": "7d9afaf3-7293-4b0f-86af-0e14610d36d0", | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"(['73dd81c88fe62de4e7ab710aa04861a9',\n", | |
" '162bcd77dff3ff2fefebdb951d6b87cf',\n", | |
" '588950f656a840423c936281eca794b5',\n", | |
" '2fc1db292ee57bc003f5abf311a487f8',\n", | |
" '4a0e249263a6695ec780881634dccfd7',\n", | |
" '3f1594cfb346502818c98bd22e14f783'],\n", | |
" ['73dd81c88fe62de4e7ab710aa04861a9',\n", | |
" '162bcd77dff3ff2fefebdb951d6b87cf',\n", | |
" '588950f656a840423c936281eca794b5',\n", | |
" '2fc1db292ee57bc003f5abf311a487f8',\n", | |
" '4a0e249263a6695ec780881634dccfd7',\n", | |
" '3f1594cfb346502818c98bd22e14f783'])" | |
] | |
}, | |
"execution_count": 18, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"parallel_result[:6], local_result[:6]" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 19, | |
"id": "d9cf2c5d-c09a-4174-8742-5ae94ef79368", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"assert parallel_result == local_result" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 20, | |
"id": "946dddcd-c67c-4d56-8eb1-b09158e6b8aa", | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"cluster.stop_cluster_sync()" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3 (ipykernel)", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.10.13" | |
}, | |
"widgets": { | |
"application/vnd.jupyter.widget-state+json": { | |
"state": { | |
"143be504fe5441689eca96ac9ccec493": { | |
"model_module": "@jupyter-widgets/controls", | |
"model_module_version": "2.0.0", | |
"model_name": "HTMLModel", | |
"state": { | |
"layout": "IPY_MODEL_747e3490395a479b83bc14fe365e238d", | |
"style": "IPY_MODEL_d13ec3038c254ed0851b60516977e14c", | |
"value": " 100/100 [00:02<00:00, 14.64engine/s]" | |
} | |
}, | |
"40f0e04e1fa746f2b1a27d0e76b8985b": { | |
"model_module": "@jupyter-widgets/controls", | |
"model_module_version": "2.0.0", | |
"model_name": "HTMLModel", | |
"state": { | |
"layout": "IPY_MODEL_6be4ef4b2d2e49dd94d9da4f1780b7b8", | |
"style": "IPY_MODEL_56c5c380f3a140f785c88a734f7a862d", | |
"value": "100%" | |
} | |
}, | |
"483ad719ffc6415e98959095c86adaa0": { | |
"model_module": "@jupyter-widgets/controls", | |
"model_module_version": "2.0.0", | |
"model_name": "FloatProgressModel", | |
"state": { | |
"bar_style": "success", | |
"layout": "IPY_MODEL_b7fdc5b1ff9c412bb2b07a482740ba7d", | |
"style": "IPY_MODEL_b49989fd5ae04c888aa241348c74beb8", | |
"value": 100 | |
} | |
}, | |
"56c5c380f3a140f785c88a734f7a862d": { | |
"model_module": "@jupyter-widgets/controls", | |
"model_module_version": "2.0.0", | |
"model_name": "HTMLStyleModel", | |
"state": { | |
"description_width": "", | |
"font_size": null, | |
"text_color": null | |
} | |
}, | |
"6be4ef4b2d2e49dd94d9da4f1780b7b8": { | |
"model_module": "@jupyter-widgets/base", | |
"model_module_version": "2.0.0", | |
"model_name": "LayoutModel", | |
"state": {} | |
}, | |
"6c86b71fac9949749044fe193b039357": { | |
"model_module": "@jupyter-widgets/controls", | |
"model_module_version": "2.0.0", | |
"model_name": "HBoxModel", | |
"state": { | |
"children": [ | |
"IPY_MODEL_40f0e04e1fa746f2b1a27d0e76b8985b", | |
"IPY_MODEL_483ad719ffc6415e98959095c86adaa0", | |
"IPY_MODEL_143be504fe5441689eca96ac9ccec493" | |
], | |
"layout": "IPY_MODEL_73c80f3941e7457b89e9065bf637ff3f" | |
} | |
}, | |
"73c80f3941e7457b89e9065bf637ff3f": { | |
"model_module": "@jupyter-widgets/base", | |
"model_module_version": "2.0.0", | |
"model_name": "LayoutModel", | |
"state": {} | |
}, | |
"747e3490395a479b83bc14fe365e238d": { | |
"model_module": "@jupyter-widgets/base", | |
"model_module_version": "2.0.0", | |
"model_name": "LayoutModel", | |
"state": {} | |
}, | |
"b49989fd5ae04c888aa241348c74beb8": { | |
"model_module": "@jupyter-widgets/controls", | |
"model_module_version": "2.0.0", | |
"model_name": "ProgressStyleModel", | |
"state": { | |
"description_width": "" | |
} | |
}, | |
"b7fdc5b1ff9c412bb2b07a482740ba7d": { | |
"model_module": "@jupyter-widgets/base", | |
"model_module_version": "2.0.0", | |
"model_name": "LayoutModel", | |
"state": {} | |
}, | |
"d13ec3038c254ed0851b60516977e14c": { | |
"model_module": "@jupyter-widgets/controls", | |
"model_module_version": "2.0.0", | |
"model_name": "HTMLStyleModel", | |
"state": { | |
"description_width": "", | |
"font_size": null, | |
"text_color": null | |
} | |
} | |
}, | |
"version_major": 2, | |
"version_minor": 0 | |
} | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment