Skip to content

Instantly share code, notes, and snippets.

@minrk
Created March 7, 2024 09:11
Show Gist options
  • Save minrk/840ca3ae4cc39dd881f26e44bc814ab5 to your computer and use it in GitHub Desktop.
Save minrk/840ca3ae4cc39dd881f26e44bc814ab5 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"id": "ec36ea2a-919b-4eb5-80af-bb53a467a87f",
"metadata": {},
"source": [
"# Map-style computation via SPMD using IPython Parallel Broadcast View\n",
"\n",
"In order to use BroadcastView efficiently, tasks must be able to be expressed as a single `apply` (or `execute`) call.\n",
"This is best done through [SPMD-style](https://en.wikipedia.org/wiki/Single_program,_multiple_data) tasks.\n",
"\n",
"If you've written code for MPI, this pattern ought to be familiar.\n",
"\n",
"The main thing that differs SPMD tasks is that each engine gets the same _code_ to execute,\n",
"but the results of what the execute depend on the _state_ of the process.\n",
"Typically the engine's _rank_ in the cluster and the cluster _size_, though it can get more complex.\n",
"\n",
"So rather than calling `map(func, inputs)`, you call `apply(map_func)`, where `map_func` computes a _partition_ and calls the map.\n",
"Computing the partition becomes part of the task itself."
]
},
{
"cell_type": "markdown",
"id": "09ead12d-1407-4f95-ae70-02788b941b29",
"metadata": {},
"source": [
"So the first thing we need is a function to compute the partition, given inputs, rank, and size.\n",
"Here is a very simple example partitioning function for one-dimensional sequences (e.g. lists)"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "174f49d1-8ca2-4141-a174-ea6752f9c52f",
"metadata": {},
"outputs": [],
"source": [
"def get_partition(n_items: int, rank: int, size: int)-> tuple[int, int]: \n",
" \"\"\"\n",
" Compute the partition\n",
"\n",
" Returns (start, end) of partition\n",
" \"\"\"\n",
" chunk_size = n_items // size\n",
" if n_items % size:\n",
" chunk_size += 1\n",
" start = rank * chunk_size\n",
" if rank + 1 == size:\n",
" end = n_items\n",
" else:\n",
" end = start + chunk_size\n",
" return (start, end)\n"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "21fe2d6a-7d8f-45e5-acc8-83136a5c8598",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"3 0 (0, 4)\n",
"3 1 (4, 8)\n",
"3 2 (8, 10)\n",
"4 0 (0, 3)\n",
"4 1 (3, 6)\n",
"4 2 (6, 9)\n",
"4 3 (9, 10)\n",
"5 0 (0, 2)\n",
"5 1 (2, 4)\n",
"5 2 (4, 6)\n",
"5 3 (6, 8)\n",
"5 4 (8, 10)\n"
]
}
],
"source": [
"n = 10\n",
"for size in (3, 4, 5):\n",
" for rank in range(size):\n",
" print(size, rank, get_partition(n, rank, size))"
]
},
{
"cell_type": "markdown",
"id": "fc58de7b-5d9b-47c5-aa1d-81221b6aad42",
"metadata": {},
"source": [
"Now set up our fake workload.\n",
"\n",
"It is a bunch of random files.\n",
"For our purposes, 5 files per engine."
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "6434f095-aaf7-4944-aa85-86ba5208fdee",
"metadata": {},
"outputs": [],
"source": [
"import tempfile\n",
"from pathlib import Path\n",
"tmp_dir = tempfile.TemporaryDirectory()\n",
"tmp_path = Path(tmp_dir.name)\n",
"\n",
"n_engines = 100\n",
"tasks_per_engine = 5\n",
"n_items = n_engines * tasks_per_engine\n",
"\n",
"for i in range(n_items):\n",
" with (tmp_path / f\"file-{i:03}.txt\").open(\"wb\") as f:\n",
" f.write(os.urandom(1024))"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "4e095ce7-7ba7-41db-b0fa-fff13fd96ddf",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"500"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"input_files = list(tmp_path.glob(\"*.txt\"))\n",
"len(input_files)"
]
},
{
"cell_type": "markdown",
"id": "88e1aa09-9bf5-4db5-ad38-c5b238355930",
"metadata": {},
"source": [
"Here's our task: compute the md5sum of the contents of one file"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "5720da2a-8ff1-440c-92ef-505f86b18b84",
"metadata": {},
"outputs": [],
"source": [
"from hashlib import md5\n",
"\n",
"def compute_one(fname):\n",
" hasher = md5()\n",
" with open(fname, \"rb\") as f:\n",
" hasher.update(f.read())\n",
" return hasher.hexdigest()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "9d6c99cf-8f15-49e2-ab68-e4896c2b9563",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'73dd81c88fe62de4e7ab710aa04861a9'"
]
},
"execution_count": 6,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"compute_one(input_files[0])"
]
},
{
"cell_type": "code",
"execution_count": 7,
"id": "334f5c89-41fb-44ab-8b5b-0c96e5e12624",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 4.62 ms, sys: 7.45 ms, total: 12.1 ms\n",
"Wall time: 11.6 ms\n"
]
}
],
"source": [
"%time local_result = list(map(compute_one, input_files))"
]
},
{
"cell_type": "markdown",
"id": "1896df6d-e965-4336-b133-9c63ef7bbabd",
"metadata": {},
"source": [
"Now we need to define a task that takes as input:\n",
"\n",
"- tmp_path (same everywhere)\n",
"- rank (unique per engine)\n",
"- size (same everywhere)\n",
"\n",
"which will compute the same thing as computing a chunk of `map(compute_one, input_files)`"
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "88c82b8f-b39f-437f-bafe-39ff351828db",
"metadata": {},
"outputs": [],
"source": [
"def spmd_task(tmp_path, rank, size):\n",
" # identify all inputs\n",
" all_input_files = list(Path(tmp_path).glob(\"*.txt\"))\n",
" # partition inputs\n",
" n_items = len(all_input_files)\n",
" start, end = get_partition(n_items, rank, size)\n",
" my_input_files = all_input_files[start:end]\n",
" # compute result\n",
" return list(map(compute_one, my_input_files))\n",
" "
]
},
{
"cell_type": "code",
"execution_count": 9,
"id": "02b4e24d-6bf8-48a3-9775-74cf33eeac2b",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"['73dd81c88fe62de4e7ab710aa04861a9', '162bcd77dff3ff2fefebdb951d6b87cf']\n",
"['588950f656a840423c936281eca794b5', '2fc1db292ee57bc003f5abf311a487f8']\n",
"['4a0e249263a6695ec780881634dccfd7', '3f1594cfb346502818c98bd22e14f783']\n",
"['54c61fc576a7e8ca7d0af4d14257d7a5', '1e8a9deda60a1a7218469fb67ba041fd']\n",
"['6b4a782e39c61ee9125dbf831718fea8', '3ff2c6e4c276cc8f9927e5a457d4872a']\n"
]
}
],
"source": [
"for rank in range(5):\n",
" print(spmd_task(tmp_path, rank, n_items // 2))"
]
},
{
"cell_type": "code",
"execution_count": 10,
"id": "bf01e25a-4747-45a6-897e-3eb2423518f1",
"metadata": {
"scrolled": true
},
"outputs": [
{
"data": {
"text/plain": [
"['73dd81c88fe62de4e7ab710aa04861a9',\n",
" '162bcd77dff3ff2fefebdb951d6b87cf',\n",
" '588950f656a840423c936281eca794b5',\n",
" '2fc1db292ee57bc003f5abf311a487f8',\n",
" '4a0e249263a6695ec780881634dccfd7',\n",
" '3f1594cfb346502818c98bd22e14f783',\n",
" '54c61fc576a7e8ca7d0af4d14257d7a5',\n",
" '1e8a9deda60a1a7218469fb67ba041fd',\n",
" '6b4a782e39c61ee9125dbf831718fea8',\n",
" '3ff2c6e4c276cc8f9927e5a457d4872a']"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"local_result[:10]"
]
},
{
"cell_type": "markdown",
"id": "dcfeaf1f-5138-4ce6-8a67-2ed030d7dd08",
"metadata": {},
"source": [
"Now it's time to do it in parallel"
]
},
{
"cell_type": "code",
"execution_count": 11,
"id": "5b3b3e25-cb59-4d5a-8296-c98aec512ddf",
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "6c86b71fac9949749044fe193b039357",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
" 67%|######7 | 67/100 [00:00<?, ?engine/s]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"import logging\n",
"import ipyparallel as ipp\n",
"\n",
"try:\n",
" # stop previous cluster, if we're re-running cells\n",
" cluster.stop_cluster_sync()\n",
"except NameError:\n",
" pass\n",
"\n",
"cluster = ipp.Cluster(n=n_engines, log_level=logging.WARNING)\n",
"rc = cluster.start_and_connect_sync()"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "0be69cf6-9c19-4d9f-b825-369b21d0e0bf",
"metadata": {},
"outputs": [],
"source": [
"broadcast_view = rc.broadcast_view()"
]
},
{
"cell_type": "markdown",
"id": "8eb7b765-7fe9-40d9-805d-2b9331b7edf1",
"metadata": {},
"source": [
"Distribute rank and size.\n",
"This is unnecessary if engines are created with MPI."
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "a28b6688-0bd0-4aa3-b773-5225d45c7430",
"metadata": {},
"outputs": [],
"source": [
"broadcast_view.scatter(\"rank\", rc.ids, flatten=True)\n",
"broadcast_view[\"size\"] = size = len(rc)"
]
},
{
"cell_type": "markdown",
"id": "bf5ee967-2182-4319-b787-c120dc07f874",
"metadata": {},
"source": [
"enable cloudpickle, which handles imports;\n",
"we could also explicitly push everything we are going to use."
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "422208f1-01ef-41f9-a1e7-d10379c0fbde",
"metadata": {},
"outputs": [],
"source": [
"broadcast_view.use_cloudpickle().get();"
]
},
{
"cell_type": "markdown",
"id": "081882db-5c5a-4137-9fba-5cf6deb83168",
"metadata": {},
"source": [
"We can now send this SPMD task as a _single_ task on all engines,\n",
"each of which will compute its own partition as part of the task and do its work:"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "0b563c53-2ba5-4fab-b0a3-65953acd9341",
"metadata": {},
"outputs": [],
"source": [
"ar = broadcast_view.apply(spmd_task, tmp_path, ipp.Reference(\"rank\"), size)"
]
},
{
"cell_type": "markdown",
"id": "a8d74569-017d-4e3f-b32a-c5b0f288e4d6",
"metadata": {},
"source": [
"Finally, we can reconstruct the result:\n",
"\n",
"Because we called `apply`, the result is a list of lists, when we want a single flat sequence.\n",
"`itertools.chain` takes care of that!"
]
},
{
"cell_type": "code",
"execution_count": 16,
"id": "22f24204-b7b7-4586-b927-5ac0e21d13c3",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[['73dd81c88fe62de4e7ab710aa04861a9',\n",
" '162bcd77dff3ff2fefebdb951d6b87cf',\n",
" '588950f656a840423c936281eca794b5',\n",
" '2fc1db292ee57bc003f5abf311a487f8',\n",
" '4a0e249263a6695ec780881634dccfd7'],\n",
" ['3f1594cfb346502818c98bd22e14f783',\n",
" '54c61fc576a7e8ca7d0af4d14257d7a5',\n",
" '1e8a9deda60a1a7218469fb67ba041fd',\n",
" '6b4a782e39c61ee9125dbf831718fea8',\n",
" '3ff2c6e4c276cc8f9927e5a457d4872a']]"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"ar.get()[:2]"
]
},
{
"cell_type": "code",
"execution_count": 17,
"id": "e49aee9e-79e1-47d3-b5f8-5f6eedb8fa61",
"metadata": {},
"outputs": [],
"source": [
"from itertools import chain\n",
"\n",
"parallel_result = list(chain(*ar))"
]
},
{
"cell_type": "code",
"execution_count": 18,
"id": "7d9afaf3-7293-4b0f-86af-0e14610d36d0",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"(['73dd81c88fe62de4e7ab710aa04861a9',\n",
" '162bcd77dff3ff2fefebdb951d6b87cf',\n",
" '588950f656a840423c936281eca794b5',\n",
" '2fc1db292ee57bc003f5abf311a487f8',\n",
" '4a0e249263a6695ec780881634dccfd7',\n",
" '3f1594cfb346502818c98bd22e14f783'],\n",
" ['73dd81c88fe62de4e7ab710aa04861a9',\n",
" '162bcd77dff3ff2fefebdb951d6b87cf',\n",
" '588950f656a840423c936281eca794b5',\n",
" '2fc1db292ee57bc003f5abf311a487f8',\n",
" '4a0e249263a6695ec780881634dccfd7',\n",
" '3f1594cfb346502818c98bd22e14f783'])"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"parallel_result[:6], local_result[:6]"
]
},
{
"cell_type": "code",
"execution_count": 19,
"id": "d9cf2c5d-c09a-4174-8742-5ae94ef79368",
"metadata": {},
"outputs": [],
"source": [
"assert parallel_result == local_result"
]
},
{
"cell_type": "code",
"execution_count": 20,
"id": "946dddcd-c67c-4d56-8eb1-b09158e6b8aa",
"metadata": {},
"outputs": [],
"source": [
"cluster.stop_cluster_sync()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
},
"widgets": {
"application/vnd.jupyter.widget-state+json": {
"state": {
"143be504fe5441689eca96ac9ccec493": {
"model_module": "@jupyter-widgets/controls",
"model_module_version": "2.0.0",
"model_name": "HTMLModel",
"state": {
"layout": "IPY_MODEL_747e3490395a479b83bc14fe365e238d",
"style": "IPY_MODEL_d13ec3038c254ed0851b60516977e14c",
"value": " 100/100 [00:02&lt;00:00, 14.64engine/s]"
}
},
"40f0e04e1fa746f2b1a27d0e76b8985b": {
"model_module": "@jupyter-widgets/controls",
"model_module_version": "2.0.0",
"model_name": "HTMLModel",
"state": {
"layout": "IPY_MODEL_6be4ef4b2d2e49dd94d9da4f1780b7b8",
"style": "IPY_MODEL_56c5c380f3a140f785c88a734f7a862d",
"value": "100%"
}
},
"483ad719ffc6415e98959095c86adaa0": {
"model_module": "@jupyter-widgets/controls",
"model_module_version": "2.0.0",
"model_name": "FloatProgressModel",
"state": {
"bar_style": "success",
"layout": "IPY_MODEL_b7fdc5b1ff9c412bb2b07a482740ba7d",
"style": "IPY_MODEL_b49989fd5ae04c888aa241348c74beb8",
"value": 100
}
},
"56c5c380f3a140f785c88a734f7a862d": {
"model_module": "@jupyter-widgets/controls",
"model_module_version": "2.0.0",
"model_name": "HTMLStyleModel",
"state": {
"description_width": "",
"font_size": null,
"text_color": null
}
},
"6be4ef4b2d2e49dd94d9da4f1780b7b8": {
"model_module": "@jupyter-widgets/base",
"model_module_version": "2.0.0",
"model_name": "LayoutModel",
"state": {}
},
"6c86b71fac9949749044fe193b039357": {
"model_module": "@jupyter-widgets/controls",
"model_module_version": "2.0.0",
"model_name": "HBoxModel",
"state": {
"children": [
"IPY_MODEL_40f0e04e1fa746f2b1a27d0e76b8985b",
"IPY_MODEL_483ad719ffc6415e98959095c86adaa0",
"IPY_MODEL_143be504fe5441689eca96ac9ccec493"
],
"layout": "IPY_MODEL_73c80f3941e7457b89e9065bf637ff3f"
}
},
"73c80f3941e7457b89e9065bf637ff3f": {
"model_module": "@jupyter-widgets/base",
"model_module_version": "2.0.0",
"model_name": "LayoutModel",
"state": {}
},
"747e3490395a479b83bc14fe365e238d": {
"model_module": "@jupyter-widgets/base",
"model_module_version": "2.0.0",
"model_name": "LayoutModel",
"state": {}
},
"b49989fd5ae04c888aa241348c74beb8": {
"model_module": "@jupyter-widgets/controls",
"model_module_version": "2.0.0",
"model_name": "ProgressStyleModel",
"state": {
"description_width": ""
}
},
"b7fdc5b1ff9c412bb2b07a482740ba7d": {
"model_module": "@jupyter-widgets/base",
"model_module_version": "2.0.0",
"model_name": "LayoutModel",
"state": {}
},
"d13ec3038c254ed0851b60516977e14c": {
"model_module": "@jupyter-widgets/controls",
"model_module_version": "2.0.0",
"model_name": "HTMLStyleModel",
"state": {
"description_width": "",
"font_size": null,
"text_color": null
}
}
},
"version_major": 2,
"version_minor": 0
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment