Skip to content

Instantly share code, notes, and snippets.

@manodeep
Last active July 5, 2020 04:10
Show Gist options
  • Save manodeep/12f9e9f2f5e80eecfff04d2086107d2f to your computer and use it in GitHub Desktop.
Save manodeep/12f9e9f2f5e80eecfff04d2086107d2f to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Writing Parallel Code (Manodeep Sinha, Harley-Wood School, July 2020)\n",
"\n",
"The goal for this session is to introduce you to the distributed programming paradigm. We will discuss the differences between writing sequential (or serial) code and parallel code, and work on an example parallel python code (using mpi4py). The key idea in this talk is to understand how to partition work into independent units and then use `mpi4py` to run the jobs in parallel.\n",
"\n",
"Attendee pre-requesites:\n",
"- Familiar with good coding practices (see talks by Rebecca, Paul at this HWSA or HWSA 2019)\n",
"- Familiar with python programming\n",
"\n",
"Software pre-requisites:\n",
"- python3.6+\n",
"- numpy\n",
"- mpi4py\n",
"\n",
"\n",
"## Why you might want to write parallel code\n",
"\n",
"Python, by design, only runs in a sequential mode. While there are way to run multi-threaded using `C` or `cython`, python code is sequential in general. Being able to run a job in parallel reduces the total time taken, which means you get to your Science results quicker. \n",
"\n",
"One way of parallelising code is through the `multiprocessing` package; however, that only runs on a single computer and is limited by the total number of available cores on that computer (almost certainly less 100 cores per computer). Running a job in a distributed fashion (e.g., MPI parallel) let's you scale to 100's-1000's of cores. "
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import matplotlib\n",
"matplotlib.rcParams.update({'font.size': 18})\n",
"matplotlib.rcParams.update({'xtick.direction': 'in'})\n",
"matplotlib.rcParams.update({'ytick.direction': 'in'})\n",
"import matplotlib.pyplot as plt"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"%matplotlib inline"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Let's look at some processing code\n",
"\n",
"Here we will count the number of lines in each file from a list of ascii files. "
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"def count_lines_in_filenames(filenames):\n",
" \"\"\"\n",
" Returns a list containing the number of lines in the \n",
" input (ascii) files \n",
" \"\"\"\n",
" \n",
" def _count_lines(fname):\n",
" with open(fname, 'rt') as f:\n",
" count = 0\n",
" for line in f:\n",
" count += 1\n",
" return count\n",
" \n",
" # Protect against the case where a single\n",
" # file was passsed\n",
" if not isinstance(filenames, (list, tuple)):\n",
" filenames = [filenames]\n",
" \n",
" numlines = []\n",
" for fname in filenames:\n",
" numlines.append(_count_lines(fname))\n",
"\n",
" # or use list-comprehension\n",
" # numlines = [_count_lines(fname) for fname in filenames]\n",
"\n",
" return numlines"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### But what if you want to count lines a different way\n",
"\n",
"Easy enough - just add a second parameter that accepts a function. If this parameter is unspecified, then the internal `_count_lines` function will be used."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def count_lines_in_filenames_external_func(filenames, line_counter_func=None):\n",
" def _count_lines(fname):\n",
" with open(fname, 'rt') as f:\n",
" count = 0\n",
" for line in f:\n",
" count += 1\n",
" return count\n",
" \n",
" if not line_counter_func: \n",
" line_counter_func = _count_lines\n",
" \n",
" # Protect against the case where a single\n",
" # file was passsed\n",
" if not isinstance(filenames, (list, tuple)):\n",
" filenames = [filenames]\n",
"\n",
" numlines = []\n",
" for fname in filenames:\n",
" numlines.append(line_counter_func(fname))\n",
"\n",
" # or use list comprehension\n",
" # numlines = [line_counter_func(fname) for fname in filenames]\n",
"\n",
" return numlines"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### But line-counting is simple, how about an arbitrary function?\n",
"\n",
"So far we have used the simple example of counting lines, but you can easily extend to concept to any arbitrary processing of a file. If no external processing function is passed, then we will use the \"line-counting\" function."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"def arbitrary_processing(filenames, processing_func=None):\n",
" def _count_lines(fname):\n",
" with open(fname, 'rt') as f:\n",
" count = 0\n",
" for line in f:\n",
" count += 1\n",
" return count\n",
"\n",
" if not processing_func: \n",
" processing_func = _count_lines\n",
" \n",
" # Protect against the case where a single\n",
" # file was passsed\n",
" if not isinstance(filenames, (list, tuple)):\n",
" files = [filenames]\n",
" \n",
" results = []\n",
" for fname in filenames:\n",
" results.append(processing_func(fname))\n",
"\n",
" # or use list comprehension\n",
" # results = [processing_func(fname) for fname in filenames] \n",
"\n",
" return results"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<hr style=\"border:2px solid gray\"> </hr>\n",
"\n",
"# Basic Concepts of Distributed Computing\n",
"\n",
"\n",
"* What information is shared\n",
"* Which instruction runs when\n",
"* Which task does what\n",
"* Where is the output produced\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<hr style=\"border:2px solid gray\"> </hr>\n",
"\n",
"\n",
"## What information is shared\n",
"\n",
"While programming, we are always working with variables. Generally speaking there are three kinds of variables:\n",
"\n",
"1. local variables - variables defined at/near code location where it is used\n",
"2. global variables - variables that are explicitly defined to be available to all functions\n",
"3. function parameters - variables that are explicitly passed as arguments to a function\n",
"\n",
"In a sequential code, variables that are required can be made available by altering how the variable is declared. \n",
"\n",
"In a multiprocessing code, all variables values are shared up to the point where the multiprocessing call is made. After that point, any modifications in the variable values in one process are no longer visible to other processes.\n",
"\n",
"In a distributed code, variables on the same process follow the rules of sequential code; however, explicit transfers are required to obtain values of variables on different processes. \n",
"\n",
"----"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<hr style=\"border:2px solid gray\"> </hr>\n",
"\n",
"\n",
"## Which instruction runs when\n",
"\n",
"In a sequential code, each line of code is executed in serial; lines occurring earlier are guaranteed to be executed before any later lines of code are executed. \n",
"\n",
"In a distributed code, each task runs in a sequential mode but by default, there is no connection between the execution states of the various tasks. For instance, it is possible for one task to have completed before another task has even started up. If your problem needs any guarantee about all tasks having completed up to a certain point in the code, you will have to explicitly program in these \"synchronisation\" points (or barriers). In a distributed code, when such a synchronisation point is reached, tasks will wait at that state until **every** task has reached that synchronisation point. \n",
"\n",
"Think about it as a 400m race where the sprinters (\"tasks\") can be anyone from Florence Griffith Joyner to Usain Bolt, to you and I. Clearly, without additional gates and barriers, some of the sprinters (\"tasks\") will finish much earlier than others. This is when you need to consider how to partition the work such that everyone takes the similar amount of time to finish their workload.\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<hr style=\"border:2px solid gray\"> </hr>\n",
"\n",
"\n",
"## Which task does what\n",
"\n",
"In a sequential code, there is only one task and that task performs the entirety of the computation.\n",
"In a distributed code, you will have to explicitly partition the entire computation over the available tasks. While partitioning, you will need to make sure of the following:\n",
"\n",
"- The entirety of the computation is done\n",
"- Every unique work unit is performed by exactly one task (i.e., no redundant work between multiple tasks)\n",
"- The results from each work unit are collected at an unique location, either in memory or on the hard-drive (e.g., task 0 does not overwrite the results from task 1)\n",
" \n",
"-----\n",
"<details><summary>Advanced Section(click to expand)</summary>\n",
" \n",
" Partitioning the entire computation for distributed computing also needs to account for the following edge-cases:\n",
"\n",
" - What happens if there are fewer tasks than work units? You will have to consider whether you can split your work units even further (i.e., more fine-grained parallelism)\n",
" - What happens if the individual work units take different amounts of time? You will have to guess-timate the workload for each work unit, and then assign based on the total workload. Another strategy could be to distribute the first `NTasks` workunits over the available tasks, and then distribute the remaining work units as each task finishes with the previously assigned work unit. \n",
"\n",
"</details>\n",
"\n",
"-----\n",
"\n",
"## Which task does what\n",
"\n",
"Or, how to partition the data over all available tasks. For instance, say you have a list containing `N` elements (say a list of `N` filenames), and you have `ntasks` that are available, then how do you distribute the `N` elements over these `ntasks` such that these two conditions hold:\n",
"\n",
"1. Every element is processed\n",
"2. No element is processed twice \n",
"\n",
"There are two common mechanisms to partition the list:\n",
"\n",
"-----\n",
"\n",
"### Which task does what (Pattern 1)\n",
"\n",
"1. Assign first element to task1, second element to task2, third element to task3 and so on until taskN has been assigned. After taskN is assigned, assign the next work element to task1, and then repeat until all work elements have been assigned. \n",
"\n",
"\n",
"```python\n",
" # assumes rank is the unique identifier among all tasks and \n",
" # that rank goes from 0 to ntasks-1 (inclusive)\n",
" for i in range(rank, nwork, ntasks):\n",
" do_processing(array[i])\n",
"```\n",
"\n",
"Consider three cases:\n",
"\n",
"1. The number of work units (`nwork`) is smaller than the number of available tasks (`ntasks`), i.e. `nwork < ntasks`\n",
"\n",
"If the total number of work units (`nwork`) is smaller than `ntasks`, then the initial tasks numbered from 0 to `nwork-1` will get one work unit a piece, and the remaining (`ntasks - nwork`) tasks will not have any work. \n",
"\n",
"---\n",
"\n",
"2. The number of work units (`nwork`) is *exactly* equal to the number of available tasks (`ntasks`), i.e., `nwork == ntasks`\n",
"\n",
"Then each task gets *exactly* one work unit. \n",
"\n",
"---\n",
"3. The number of work units (`nwork`) is larger than the number of available tasks (`ntasks`), i.e., `nwork > ntasks`\n",
"\n",
"Then each task gets at least `nwork // ntasks` work units. The remaining units (`nrem := nwork mod ntasks`) is distributed over the first `nrem` tasks.\n",
"\n",
"---\n",
"\n",
"This partitioning scheme ensures that i) every element is processed and ii) no element is processed twice, regardless of the relative values of `N` and `ntasks`.\n",
"\n",
"\n",
"**This is the pattern we are going to follow here.**\n",
"\n",
"\n",
"### Which task does what (Pattern 2)\n",
"\n",
"\n",
"2. Calculate the average number of work units per task. Then assign a contiguous block of work units to each unit. If the total number of work units is not exactly divisible by the number of tasks, then assign one additional work unit for the initial tasks until all the extra work units are assigned. (Make sure that these additional work units are accounted for in later tasks)\n",
"\n",
"```python\n",
" nwork_per_task = nwork // ntasks\n",
" nrem = nwork % ntasks\n",
"\n",
" start = 0\n",
" for itask in ntasks:\n",
" nwork_this_task = nwork_per_task\n",
" if nrem > 0:\n",
" nwork_this_task += 1\n",
" nrem -= 1\n",
"\n",
" end = start + nwork_this_task\n",
" if rank == itask:\n",
" break\n",
"\n",
" start = end\n",
"\n",
" do_processing(array[start:end])\n",
"```\n",
"\n",
"This is the same logic as [np.array_split](https://numpy.org/doc/stable/reference/generated/numpy.array_split.html).\n",
"\n",
"----\n",
"\n",
"<details> <summary> Advanced challenge (click to expand)</summary>\n",
" \n",
"The assumption in both of the patterns above is that every work unit takes the same amount of computing time. However, that is not necessarily the case for many real-world problems, and some units of work may take a lot more computing time than others. Ideally, when you are partitioning the `nwork` units over the `ntasks`, you will want to distribute the work such that *every* task gets a work that takes similar amount of total computing time. That way, all tasks finish at about the same time; otherwise, some tasks might finish early and sit idle while a small number of the total tasks requested are actually working (and then you possibly get pointed emails from supercomputer admins)\n",
"\n",
"*How would you distribute the work in such a scenario?*\n",
"Assume that you have an array of shape `nwork`, that contains an estimate of the computing time for each work unit.\n",
" \n",
"</details>\n",
"\n",
"----\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<hr style=\"border:2px solid gray\"> </hr>\n",
"\n",
"\n",
"## Where is the output produced\n",
"\n",
"In a sequential code, since there is only one task, and that task does the entire computation, the return address is unique by default. For instance, imagine a code that writes out results to a file called \"results.txt\". In the sequential case, there will be only one file created and that file will contain all of the results. \n",
"\n",
"In a distributed code, you have to ensure that each task returns results to an unique location. For instance, if you simply run the previous code in a distributed fashion, then every task will write to a `results.txt` file and the the final output file will be corrupted (and the contents are likely to change everytime). A standard resolution is to use `result_<tasknumber>.txt`, and then collate all the files into a `results.txt` file at the end (if necessaary).\n",
"\n",
"----\n",
"<details> <summary> Advanced Section (click to expand)</summary>\n",
" \n",
"Another option is to create a single file, but have each task write to a separate region (e.g., parallel hdf5):\n",
"\n",
" - You will need to compute (or know) the filesize for the entire computation. This means you will need to know the size of the result for every work unit\n",
" - (Typically) You will need to create the empty file with the correct total size\n",
" - Every task will need to know where to write the output results from processing a specific work unit (i.e., you will need to create a mapping that knows the byte-offset in the output file). If you know the size of every result (the byte-size of the result and *not* the values in bytes), then you can calculate the byte-offset for each result from a cumulative sum of all preceeding byte-sizes (something like `np.cumsum(result_sizes)`)\n",
" \n",
"</details>\n",
"\n",
"----\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Let's look at code at how the partition works in practice\n",
"\n",
"<hr style=\"border:4px solid gray\"> </hr>"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def print_partition(nwork, ntasks, partition_type=1):\n",
" import numpy as np\n",
" assigned_task = np.empty(nwork, dtype=np.int)\n",
" if partition_type == 1:\n",
" task = 0\n",
" for i in range(nwork):\n",
" assigned_task[i] = task\n",
" task += 1\n",
" if task == ntasks: \n",
" task = 0\n",
" else:\n",
" nwork_each_task = nwork // ntasks\n",
" nrem = nwork % ntasks\n",
" start = 0\n",
" for itask in range(ntasks):\n",
" nwork_this_task = nwork_each_task\n",
" if nrem > 0:\n",
" nwork_this_task += 1\n",
" nrem -= 1\n",
" assigned_task[start:start+nwork_this_task] = itask\n",
" start += nwork_this_task\n",
" \n",
" print(f\"nwork = {nwork} ntasks = {ntasks} partition-type = {partition_type}\")\n",
" print(\"{}\".format('-'*(nwork*5 + 8))) \n",
" print(\"| Work |\", end=\"\")\n",
" for i in range(nwork):\n",
" print(\" {:02d} |\".format(i), end='')\n",
" print(\"\")\n",
" print(\"{}\".format('+'*(nwork*5 + 8))) \n",
" \n",
" print(\"| Task |\", end=\"\")\n",
" for i in range(nwork):\n",
" print(\" {:02d} |\".format(assigned_task[i]), end='')\n",
" print(\"\")\n",
" print(\"{}\".format('-'*(nwork*5 + 8))) \n",
" \n",
" return"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"nwork = 14 ntasks = 3 partition-type = 1\n",
"------------------------------------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 | 13 |\n",
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 |\n",
"------------------------------------------------------------------------------\n",
"\n",
"nwork = 14 ntasks = 3 partition-type = 2\n",
"------------------------------------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 | 13 |\n",
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 00 | 00 | 00 | 00 | 01 | 01 | 01 | 01 | 01 | 02 | 02 | 02 | 02 |\n",
"------------------------------------------------------------------------------\n",
"\n",
"\n",
"nwork = 3 ntasks = 5 partition-type = 1\n",
"-----------------------\n",
"| Work | 00 | 01 | 02 |\n",
"+++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 |\n",
"-----------------------\n",
"\n",
"nwork = 3 ntasks = 5 partition-type = 2\n",
"-----------------------\n",
"| Work | 00 | 01 | 02 |\n",
"+++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 |\n",
"-----------------------\n",
"\n",
"\n",
"nwork = 7 ntasks = 4 partition-type = 1\n",
"-------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 |\n",
"+++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 | 03 | 00 | 01 | 02 |\n",
"-------------------------------------------\n",
"\n",
"nwork = 7 ntasks = 4 partition-type = 2\n",
"-------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 |\n",
"+++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 00 | 01 | 01 | 02 | 02 | 03 |\n",
"-------------------------------------------\n",
"\n",
"\n",
"nwork = 8 ntasks = 4 partition-type = 1\n",
"------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 |\n",
"++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 | 03 | 00 | 01 | 02 | 03 |\n",
"------------------------------------------------\n",
"\n",
"nwork = 8 ntasks = 4 partition-type = 2\n",
"------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 |\n",
"++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 00 | 01 | 01 | 02 | 02 | 03 | 03 |\n",
"------------------------------------------------\n",
"\n",
"\n",
"nwork = 15 ntasks = 3 partition-type = 1\n",
"-----------------------------------------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 | 13 | 14 |\n",
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 | 02 |\n",
"-----------------------------------------------------------------------------------\n",
"\n",
"nwork = 15 ntasks = 3 partition-type = 2\n",
"-----------------------------------------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 | 13 | 14 |\n",
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 00 | 00 | 00 | 00 | 01 | 01 | 01 | 01 | 01 | 02 | 02 | 02 | 02 | 02 |\n",
"-----------------------------------------------------------------------------------\n",
"\n",
"\n"
]
}
],
"source": [
"nwork_and_ntasks = [(14, 3), (3, 5), (7, 4), (8, 4), (15, 3)]\n",
"for nw, nt in nwork_and_ntasks:\n",
" for ptype in [1, 2]:\n",
" print_partition(nw, nt, partition_type=ptype)\n",
" print(\"\")\n",
" print(\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### [Advanced] How would you improve the previous code?\n",
"\n",
"\n",
"\n",
"\n",
"----\n",
"\n",
"<details> <summary> Here's my attempt (Click to expand)</summary>\n",
"\n",
"\n",
"```python\n",
"def assign_partition(nwork, ntasks, partition_type=1, \n",
" pretty_print=True):\n",
" \"\"\"\n",
" Partitions a set of work over the specified number of tasks\n",
" \n",
" Parameters\n",
" -----------\n",
" \n",
" \n",
" Returns\n",
" -------\n",
" \n",
" \n",
" \"\"\"\n",
" import numpy as np\n",
" def _pretty_print(assigned_tasks, nwork, ntasks, partition_type):\n",
" get_numchars = lambda x : (x*5 + 8)\n",
" print(f\"nwork = {nwork} ntasks = {ntasks} partition-type = {partition_type}\")\n",
" print(\"{}\".format('-'*get_numchars(nwork)))\n",
" print(\"| Work |\", end=\"\")\n",
" for i in range(nwork):\n",
" print(\" {:02d} |\".format(i), end='')\n",
" print(\"\")\n",
" print(\"{}\".format('+'*get_numchars(nwork))) \n",
"\n",
" print(\"| Task |\", end=\"\")\n",
" for i in range(nwork):\n",
" print(\" {:02d} |\".format(assigned_task[i]), end='')\n",
" print(\"\")\n",
" print(\"{}\".format('-'*get_numchars(nwork))) \n",
"\n",
" return\n",
"\n",
" # the main function begins here\n",
" if nwork < 0 or ntasks < 1:\n",
" msg = f\"Error: Number of work units = {nwork} must \"\\\n",
" f\"be >= 0 *and* the number of tasks = {ntasks} \"\\\n",
" f\"must be >= 1\"\n",
" raise ValueError(msg)\n",
"\n",
" # You may additionally want to check that both\n",
" # nwork and ntasks are scalar *and* integers\n",
" \n",
" assigned_task = np.empty(nwork, dtype=np.int64)\n",
"\n",
" # Instead of the functions being written here, \n",
" # you might want to write sub-functions for each \n",
" # partition type, and then call the correct type \n",
" # based on the partition type requested. \n",
" # Raise NotImplementedError when an unknown partition type\n",
" # is requested\n",
" if partition_type == 1:\n",
" task = 0\n",
" for i in range(nwork):\n",
" assigned_task[i] = task\n",
" task += 1\n",
" if task == ntasks: \n",
" task = 0\n",
" elif partition_type == 2:\n",
" nwork_each_task = nwork // ntasks\n",
" nrem = nwork % ntasks\n",
" start = 0\n",
" for itask in range(ntasks):\n",
" nwork_this_task = nwork_each_task\n",
" if nrem > 0:\n",
" nwork_this_task += 1\n",
" nrem -= 1\n",
" assigned_task[start:start+nwork_this_task] = itask\n",
" start += nwork_this_task\n",
" else:\n",
" msg = f\"Unknown partition type = {partition_type}\"\n",
" raise NotImplementedError(msg)\n",
" \n",
" if pretty_print:\n",
" _pretty_print(assigned_tasks, nwork, ntasks, partition_type)\n",
" \n",
" return assigned_task\n",
"```\n",
"\n",
"</details>\n",
"\n",
"----\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"def assign_partition(nwork, ntasks, partition_type=1, \n",
" pretty_print=True):\n",
" \"\"\"\n",
" Partitions a set of work over the specified number of tasks\n",
" \n",
" Parameters\n",
" -----------\n",
" \n",
" \n",
" Returns\n",
" -------\n",
" \n",
" \n",
" \"\"\"\n",
" import numpy as np\n",
" def _pretty_print(assigned_tasks, nwork, ntasks, partition_type):\n",
" get_numchars = lambda x : (x*5 + 8)\n",
" print(f\"nwork = {nwork} ntasks = {ntasks} partition-type = {partition_type}\")\n",
" print(\"{}\".format('-'*get_numchars(nwork)))\n",
" print(\"| Work |\", end=\"\")\n",
" for i in range(nwork):\n",
" print(\" {:02d} |\".format(i), end='')\n",
" print(\"\")\n",
" print(\"{}\".format('+'*get_numchars(nwork))) \n",
"\n",
" print(\"| Task |\", end=\"\")\n",
" for i in range(nwork):\n",
" print(\" {:02d} |\".format(assigned_task[i]), end='')\n",
" print(\"\")\n",
" print(\"{}\".format('-'*get_numchars(nwork))) \n",
"\n",
" return\n",
"\n",
" # the main function begins here\n",
" if nwork < 0 or ntasks < 1:\n",
" msg = f\"Error: Number of work units = {nwork} must \"\\\n",
" f\"be >= 0 *and* the number of tasks = {ntasks} \"\\\n",
" f\"must be >= 1\"\n",
" raise ValueError(msg)\n",
"\n",
" # You may additionally want to check that both\n",
" # nwork and ntasks are scalar *and* integers\n",
" \n",
" assigned_task = np.empty(nwork, dtype=np.int64)\n",
"\n",
" # Instead of the functions being written here, \n",
" # you might want to write sub-functions for each \n",
" # partition type, and then call the correct type \n",
" # based on the partition type requested. \n",
" # Raise NotImplementedError when an unknown partition type\n",
" # is requested\n",
" if partition_type == 1:\n",
" task = 0\n",
" for i in range(nwork):\n",
" assigned_task[i] = task\n",
" task += 1\n",
" if task == ntasks: \n",
" task = 0\n",
" elif partition_type == 2:\n",
" nwork_each_task = nwork // ntasks\n",
" nrem = nwork % ntasks\n",
" start = 0\n",
" for itask in range(ntasks):\n",
" nwork_this_task = nwork_each_task\n",
" if nrem > 0:\n",
" nwork_this_task += 1\n",
" nrem -= 1\n",
" assigned_task[start:start+nwork_this_task] = itask\n",
" start += nwork_this_task\n",
" else:\n",
" msg = f\"Unknown partition type = {partition_type}\"\n",
" raise NotImplementedError(msg)\n",
" \n",
" if pretty_print:\n",
" _pretty_print(assigned_task, nwork, ntasks, partition_type)\n",
" \n",
" return assigned_task"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"nwork = 14 ntasks = 3 partition-type = 1\n",
"------------------------------------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 | 13 |\n",
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 |\n",
"------------------------------------------------------------------------------\n",
"\n",
"nwork = 14 ntasks = 3 partition-type = 2\n",
"------------------------------------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 | 13 |\n",
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 00 | 00 | 00 | 00 | 01 | 01 | 01 | 01 | 01 | 02 | 02 | 02 | 02 |\n",
"------------------------------------------------------------------------------\n",
"\n",
"\n",
"nwork = 3 ntasks = 5 partition-type = 1\n",
"-----------------------\n",
"| Work | 00 | 01 | 02 |\n",
"+++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 |\n",
"-----------------------\n",
"\n",
"nwork = 3 ntasks = 5 partition-type = 2\n",
"-----------------------\n",
"| Work | 00 | 01 | 02 |\n",
"+++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 |\n",
"-----------------------\n",
"\n",
"\n",
"nwork = 7 ntasks = 4 partition-type = 1\n",
"-------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 |\n",
"+++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 | 03 | 00 | 01 | 02 |\n",
"-------------------------------------------\n",
"\n",
"nwork = 7 ntasks = 4 partition-type = 2\n",
"-------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 |\n",
"+++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 00 | 01 | 01 | 02 | 02 | 03 |\n",
"-------------------------------------------\n",
"\n",
"\n",
"nwork = 8 ntasks = 4 partition-type = 1\n",
"------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 |\n",
"++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 | 03 | 00 | 01 | 02 | 03 |\n",
"------------------------------------------------\n",
"\n",
"nwork = 8 ntasks = 4 partition-type = 2\n",
"------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 |\n",
"++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 00 | 01 | 01 | 02 | 02 | 03 | 03 |\n",
"------------------------------------------------\n",
"\n",
"\n",
"nwork = 15 ntasks = 3 partition-type = 1\n",
"-----------------------------------------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 | 13 | 14 |\n",
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 | 02 | 00 | 01 | 02 |\n",
"-----------------------------------------------------------------------------------\n",
"\n",
"nwork = 15 ntasks = 3 partition-type = 2\n",
"-----------------------------------------------------------------------------------\n",
"| Work | 00 | 01 | 02 | 03 | 04 | 05 | 06 | 07 | 08 | 09 | 10 | 11 | 12 | 13 | 14 |\n",
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n",
"| Task | 00 | 00 | 00 | 00 | 00 | 01 | 01 | 01 | 01 | 01 | 02 | 02 | 02 | 02 | 02 |\n",
"-----------------------------------------------------------------------------------\n",
"\n",
"\n"
]
}
],
"source": [
"nwork_and_ntasks = [(14, 3), (3, 5), (7, 4), (8, 4), (15, 3)]\n",
"for nw, nt in nwork_and_ntasks:\n",
" for ptype in [1, 2]:\n",
" assigned_tasks = assign_partition(nw, nt, partition_type=ptype)\n",
" print(\"\")\n",
" print(\"\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Let's visualise the partition scheme"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# Taken from https://stackoverflow.com/a/28730546\n",
"def discrete_matshow(assigned_tasks, cmapname='viridis', ax=None):\n",
" import matplotlib.pyplot as plt\n",
" \n",
" nwork = assigned_tasks.shape[0]\n",
" data = assigned_tasks.reshape((1, nwork)) \n",
" if not ax:\n",
" fig, ax = plt.subplots()\n",
" \n",
" # get discrete colormap\n",
" numcolors = assigned_tasks.max() - assigned_tasks.min() + 1\n",
" cmap = plt.get_cmap(cmapname, numcolors)\n",
"\n",
" ax.pcolor(data, cmap=cmap, edgecolors='k', linewidths=4)\n",
" ax.set_xlim([0, nwork])\n",
" ax.get_yaxis().set_visible(False)\n",
" ax.get_xaxis().set_ticks(np.arange(nwork+1))\n",
" ax.set_xlabel('Work Unit')"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1440x72 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1440x72 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1440x72 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1440x72 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1440x72 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1440x72 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1440x72 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1440x72 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
},
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 1440x72 with 2 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"nwork_and_ntasks = [(14, 3), (15, 3), (16, 3), \n",
" (9, 4), (9, 3), (9, 2), \n",
" (3, 3), (3, 4), (3, 5)]\n",
"\n",
"partition_types = [1, 2]\n",
"ncols = len(partition_types)\n",
"figheight = 1.0\n",
"figsize=(20.0 * figheight, figheight)\n",
"for nw, nt in nwork_and_ntasks:\n",
" fig, axes = plt.subplots(1, ncols, figsize=figsize)\n",
" fig.subplots_adjust(top=0.6)\n",
" fig.suptitle(f\"Dividing {nw} work-units over {nt} tasks (colour shows task assigned)\", fontsize=24)\n",
" \n",
" for ptype, ax in zip(partition_types, axes):\n",
" assigned_tasks = assign_partition(nw, nt, partition_type=ptype, pretty_print=False)\n",
" discrete_matshow(assigned_tasks, ax=ax)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"\n",
"<hr style=\"border:2px solid gray\"> </hr>\n",
"\n",
"## Using `mpi4py` for distributed computing\n",
"\n",
"The `mpi4py` package makes it very easy to write distributed code. There are a variety of examples in the [mpi4py tutorial](https://mpi4py.readthedocs.io/en/stable/tutorial.html) to show how `mpi4py` works. \n",
"\n",
"Every MPI spawned process has a \"communicator\" associated with it. We will exclusively use the \"global\" communicator, `MPI.COMM_WORLD` to figure out:\n",
"\n",
"1. How many tasks (`ntasks`) are in this computation -- `MPI.COMM_WORLD.Get_rank()`\n",
"2. An unique ID (`rank`) to identify each task -- `MPI.COMM_WORLD.Get_size()`\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<hr style=\"border:2px solid gray\"> </hr>\n",
"\n",
"## Where is the output produced\n",
"\n",
"We are going to take a shortcut and assume there is no explicit output returned by the ``processing_function``. You can easily mimic this by creating an unique output file per MPI task and appending all results (from that task) into the unique file. For instance, you could create a file called `results_<rank>.txt` and then append a newline containing the `(filename, results)` for every processed `filename`."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"def your_custom_processing_func(input_fname, rank=0, outputfilebase='results'):\n",
" numlines = None\n",
" try:\n",
" with open(input_fname, 'r') as f:\n",
" numlines = 0\n",
" for line in f:\n",
" numlines += 1\n",
" except IOError:\n",
" print(f\"[Rank={rank}]: Did not find input file = '{input_fname}'\")\n",
" pass\n",
"\n",
" # Now write the reusult\n",
" outputfilename=f\"{outputfilebase}_{rank}.csv\"\n",
" with open(outputfilename, 'a') as f:\n",
" f.write(f\"{input_fname}, {numlines}\\n\")\n",
"\n",
" return "
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"here\n",
"[Rank=0]: Converting nfiles = 1 over ntasks = 1...\n",
"[Rank=0]: Converting nfiles = 1 over ntasks = 1...done. Time taken = 0.00 seconds\n"
]
}
],
"source": [
"def distributed_processing(filenames, processing_func=None, outputfilebase='results'):\n",
" import sys\n",
" import time\n",
" \n",
" rank = 0\n",
" ntasks = 1\n",
" MPI = None\n",
" comm = None\n",
" try:\n",
" from mpi4py import MPI\n",
" comm = MPI.COMM_WORLD\n",
" rank = comm.Get_rank()\n",
" ntasks = comm.Get_size()\n",
" except ImportError:\n",
" pass\n",
" \n",
" # Protect against the case where a single\n",
" # file was passsed\n",
" if not isinstance(filenames, (list, tuple)):\n",
" filenames = [filenames]\n",
" \n",
" sys.stdout.flush()\n",
" nfiles = len(filenames)\n",
" if nfiles < ntasks:\n",
" print(f\"[Rank={rank}]: Nfiles = {nfiles} < total tasks = {ntasks}. \"\\\n",
" \"Some tasks will not have any work assigned (and will be idle)\")\n",
"\n",
" tstart = time.perf_counter()\n",
" if rank == 0:\n",
" print(f\"[Rank={rank}]: Converting nfiles = {nfiles} over ntasks = {ntasks}...\")\n",
"\n",
" # Convert files in MPI parallel (if requested)\n",
" # the range will produce filenum starting with \"rank\"\n",
" # and then incrementing by \"ntasks\" all the way upto\n",
" # and inclusive of [nfiles-1]. That is, the range [0, nfiles-1]\n",
" # will be uniquely distributed over ntasks.\n",
" for filenum in range(rank, nfiles, ntasks): \n",
" processing_func(filenames[filenum], rank)\n",
"\n",
" # The barrier is only essential so that the total time printed\n",
" # out on rank==0 is correct.\n",
" if comm:\n",
" comm.Barrier()\n",
"\n",
" if rank == 0:\n",
" t1 = time.perf_counter()\n",
" print(\"[Rank={}]: Converting nfiles = {} over ntasks = {}...done. \"\\\n",
" \"Time taken = {:0.2f} seconds\".format(rank, nfiles, ntasks, t1 - tstart)) \n",
" \n",
" return True\n",
"\n",
"\n",
"if __name__ == \"__main__\":\n",
" print(\"here\")\n",
" import glob\n",
" filenames = glob.glob(\"*.txt\")\n",
" distributed_processing(filenames, your_custom_processing_func)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Switch to the command-line to run MPI code"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Challenge: If you keep re-running the code, then the output file keeps getting appended to. How would you fix that?\n",
"\n",
"\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### [Advanced] What happens if the input files have been modified?\n",
"\n",
"----\n",
"\n",
"<details> <summary> Recommended Practice (click to expand)</summary>\n",
" \n",
"A couple of things to consider:\n",
"\n",
"1. When was the input file last modified?\n",
"2. When did the line-counter (i.e., this code) run?\n",
"3. Is it possible that file contents have been modified without changing the number of lines?\n",
"\n",
"Of course, if a file has not been modified, then there is no reason to add another line. If the file has been updated, but the number of lines has not been changed, then how do you know if the file content was altered or not?\n",
"\n",
"The generic recommendation is to **always add metadata info**. For instance, in this case, you might want to consider the following (in increasing order of complexity/paranoia/thoroughness):\n",
"\n",
"1. The file modification time for the input file\n",
"2. Some sort of hash for the input file contents (md5, sha, ...)\n",
"3. The date-time when the line-counter program was run\n",
"4. A git sha identifying the source commit, or a hash of the entire source code if there are outstanding changes\n",
"\n",
"\n",
"</details>\n",
"\n",
"----\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Running parallel code within Jupyter notebooks \n",
"\n",
"Can be done but takes a bit of setup. I followed the instructions [here](https://charlesreid1.com/wiki/Jupyter/MPI) to setup `ipyparallel` and then run the following chunk of code"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Client MPI.COMM_WORLD.Get_size()=1\n",
"Client engine ids [0, 1]\n",
"Remote COMM_WORLD ranks [0, 1]\n",
"Remote COMM_WORLD size [2, 2]\n"
]
}
],
"source": [
"from ipyparallel import Client\n",
"from mpi4py import MPI\n",
"\n",
"c = Client(profile='mpi')\n",
"view = c[:]\n",
"\n",
"print(\"Client MPI.COMM_WORLD.Get_size()=%s\" % MPI.COMM_WORLD.Get_size())\n",
"print(\"Client engine ids %s\" % c.ids)\n",
"\n",
"def _get_rank():\n",
" from mpi4py import MPI\n",
" return MPI.COMM_WORLD.Get_rank()\n",
"\n",
"def _get_size():\n",
" from mpi4py import MPI\n",
" return MPI.COMM_WORLD.Get_size()\n",
"\n",
"print(\"Remote COMM_WORLD ranks %s\" % view.apply_sync(_get_rank))\n",
"print(\"Remote COMM_WORLD size %s\" % view.apply_sync(_get_size))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment