To parallelise code, either multiple processes or multiple threads can be created from the main process.
- A bit faster to start up
- They can access all memory from all other threads, therefore sharing data is easy
- Lower memory consumption
- Because they access the same memory, there are many ways errors can happen. Two threads e.g. must never write to the same variable the same time. The code must be specially written in a thread-save way. It is easier to make errors.
Python is not written fully thread-save, but still allows threads. It has therefore a special concept, called GIL (global interpreter lock): it allows threads, but only runs one at the time and pauses the other. Therefore, they bring only performance improvements to do something not CPU-demanding, e.g. reading from from a file in background. Libraries used by Python and written in other languages like C (e.g. Pandas or NumPy) are not affected by this.
We have therefore use multiple processes for using multiple CPU cores.
Unix has a simple way to create a parallel process with "fork": (https://en.wikipedia.org/wiki/Fork_(system_call)).
Fork creates two exact copies of the process, the only difference is returned process id (pid) from the fork call:
import os
pid = os.fork()
if pid == 0:
print(f"We are in child process, pid={pid}")
print('Exiting child process ...')
os._exit(0)
else:
print(f"We are in parent process, the pid of the child is {pid}.")
print("Waiting for the child process to finish ...")
os.waitpid(pid, 0) # second parameter is always 0 for normal use
Fork is not available on Windows except in the WSL environment.
With fork, we can easy parallelise a for loop. Example non-parallel:
import numpy as np
def long_calculation(value: float) -> float:
return value * 2.
values_for_calculation = np.random.rand(100)
all_process_ids = []
all_results = []
for value in values_for_calculation:
result = long_calculation(value)
all_results.append(result)
Example parallel:
import os
import numpy as np
number_cores = 2 # set it to number cores, e.g. with multiprocessing.cpu_count()
def long_calculation(value: float) -> float:
return value * 2.
values_for_calculation = np.random.rand(100)
values_for_calculation_splitted = np.array_split(x, n_core) # now split in n sublists
all_process_ids = []
for i, sub_list in enumerate(values_for_calculation_splitted):
print(f'Starting worker {i} ...')
pid = os.fork()
if pid == 0: # we are in child process
try:
all_results_worker = []
for value in sub_list:
result = long_calculation(value)
all_results_worker.append(str(result))
with open(f'results_{0d:i}.csv') as f: # Write result to CSV file
f.write(','.join(all_results_worker))
os._exit(0)
except Exception as e: # normally only narrower Exceptions like "FileNotFoundError" should be used, but here we really want to catch all
print(e)
os._exit(1)
else:
all_process_ids.append(pid)
for pid in all_process_ids:
os.waitpid(0)
The CSV files can be read and concatenated:
import glob
column_names = ['a', 'b', 'c']
all_dataframes = []
for filename in glob.glob('results_*.csv'):
df = pd.read_csv(filename, header=None, names=column_names)
all_dataframes.append(df)
together = pd.concat(all_dataframes)