Skip to content

Instantly share code, notes, and snippets.

@Ununnilium
Created May 18, 2021 08:52
Show Gist options
  • Save Ununnilium/257fd8fd6eb6178dd5d9f06a581c43c5 to your computer and use it in GitHub Desktop.
Save Ununnilium/257fd8fd6eb6178dd5d9f06a581c43c5 to your computer and use it in GitHub Desktop.

Parallise Code in Python with Fork

To parallelise code, either multiple processes or multiple threads can be created from the main process.

Threads advantages:

  • A bit faster to start up
  • They can access all memory from all other threads, therefore sharing data is easy
  • Lower memory consumption

Threads disadvantages:

  • Because they access the same memory, there are many ways errors can happen. Two threads e.g. must never write to the same variable the same time. The code must be specially written in a thread-save way. It is easier to make errors.

Python is not written fully thread-save, but still allows threads. It has therefore a special concept, called GIL (global interpreter lock): it allows threads, but only runs one at the time and pauses the other. Therefore, they bring only performance improvements to do something not CPU-demanding, e.g. reading from from a file in background. Libraries used by Python and written in other languages like C (e.g. Pandas or NumPy) are not affected by this.

We have therefore use multiple processes for using multiple CPU cores.

Unix call "fork"

Unix has a simple way to create a parallel process with "fork": (https://en.wikipedia.org/wiki/Fork_(system_call)).

Fork creates two exact copies of the process, the only difference is returned process id (pid) from the fork call:

import os

pid = os.fork()
if pid == 0:
    print(f"We are in child process, pid={pid}")
    print('Exiting child process ...')
    os._exit(0)
else:
    print(f"We are in parent process, the pid of the child is {pid}.")
    print("Waiting for the child process to finish ...")
    os.waitpid(pid, 0)  # second parameter is always 0 for normal use

Fork is not available on Windows except in the WSL environment.

With fork, we can easy parallelise a for loop. Example non-parallel:

import numpy as np


def long_calculation(value: float) -> float:
    return value * 2.

values_for_calculation = np.random.rand(100)

all_process_ids = []

all_results = []
for value in values_for_calculation:
	result = long_calculation(value)
	all_results.append(result)

Example parallel:

import os
import numpy as np

number_cores = 2  # set it to number cores, e.g. with multiprocessing.cpu_count()


def long_calculation(value: float) -> float:
    return value * 2.

values_for_calculation = np.random.rand(100)
values_for_calculation_splitted = np.array_split(x, n_core)  # now split in n sublists

all_process_ids = []

for i, sub_list in enumerate(values_for_calculation_splitted):
    print(f'Starting worker {i} ...')
    pid = os.fork()
    if pid == 0:  # we are in child process
	    try:
		    all_results_worker = []
		    for value in sub_list:
	            result = long_calculation(value)
				all_results_worker.append(str(result))
		    with open(f'results_{0d:i}.csv') as f:  # Write result to CSV file
		        f.write(','.join(all_results_worker))
			os._exit(0)
		except Exception as e:  # normally only narrower Exceptions like "FileNotFoundError" should be used, but here we really want to catch all
		    print(e)
		    os._exit(1)
	else:
	    all_process_ids.append(pid)

for pid in all_process_ids:
    os.waitpid(0)

The CSV files can be read and concatenated:

import glob

column_names = ['a', 'b', 'c']
all_dataframes = []
for filename in glob.glob('results_*.csv'):
    df = pd.read_csv(filename, header=None, names=column_names)
	all_dataframes.append(df)

together = pd.concat(all_dataframes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment