Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save arogozhnikov/f5a8cdf458fd2c03edb973ebb0f9ffc6 to your computer and use it in GitHub Desktop.
Save arogozhnikov/f5a8cdf458fd2c03edb973ebb0f9ffc6 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Python multiprocessing, joblib\n",
"\n",
"- GIL\n",
"\n",
"\n",
"> The Python interpreter is not fully thread-safe. In order to support multi-threaded Python programs, there’s a global lock, called the global interpreter lock or GIL, that must be held by the current thread before it can safely access Python objects. Without the lock, even the simplest operations could cause problems in a multi-threaded program: for example, when two threads simultaneously increment the reference count of the same object, the reference count could end up being incremented only once instead of twice.\n",
"\n",
"- GIL is in CPython, but absent in other implementations\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import time\n",
"import numpy as np\n",
"from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor\n",
"\n",
"def wait():\n",
" time.sleep(1)\n",
" return 42"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 85.1 ms, sys: 16.8 ms, total: 102 ms\n",
"Wall time: 1 s\n"
]
}
],
"source": [
"%%time\n",
"with ThreadPoolExecutor(max_workers=2) as pool:\n",
" a = pool.submit(wait)\n",
" b = pool.submit(wait)\n",
" a.result()\n",
" b.result()"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 6.62 ms, sys: 8.97 ms, total: 15.6 ms\n",
"Wall time: 1.02 s\n"
]
}
],
"source": [
"%%time\n",
"with ProcessPoolExecutor(max_workers=2) as pool:\n",
" a = pool.submit(wait)\n",
" b = pool.submit(wait)\n",
" a.result()\n",
" b.result()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## strangely, \n",
"pools are able to use locally defined functions. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## GIL is not necessarily a show-stopper\n",
"\n",
"- Common opinion that threading is completely useless too get speedup in python\n",
"- Some libs release GIL when entering long computations, those can be threaded efficiently\n",
"- go with threading if most of your computations don't require GIL"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 1.07 s, sys: 15.8 ms, total: 1.08 s\n",
"Wall time: 1.09 s\n"
]
},
{
"data": {
"text/plain": [
"507938211"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import cv2\n",
"def computation():\n",
" x = np.random.randint(0, 255, [2000, 2000]).astype('uint8')\n",
" cv2.HoughLines(x, 1, np.pi/180, 20)\n",
" return x.sum()\n",
"\n",
"%time computation()"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.07 s, sys: 21.5 ms, total: 2.09 s\n",
"Wall time: 1.08 s\n"
]
}
],
"source": [
"%%time\n",
"with ThreadPoolExecutor(max_workers=2) as executor:\n",
" a = executor.submit(computation)\n",
" b = executor.submit(computation)\n",
" a.result()\n",
" b.result()"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 5.12 ms, sys: 9.74 ms, total: 14.9 ms\n",
"Wall time: 1.1 s\n"
]
}
],
"source": [
"%%time\n",
"with ProcessPoolExecutor(max_workers=2) as executor:\n",
" a = executor.submit(computation)\n",
" b = executor.submit(computation)\n",
" a.result()\n",
" b.result()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# another default pool, has different version of map\n",
"from multiprocessing import Pool"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Python's parllelizing tools collection\n",
"\n",
"- `threading`, `multiprocessing` - older ones, built-in\n",
"- `multiproccess` - third-party\n",
"- `concurrent.futures` - newer one, built-in, narrower functionality, but seemingly more friendly and uniform\n",
"- `joblib` - simple, quite-old, third-party\n",
"\n",
"there are many other packages (I've used ipyparallel for some time)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Joblib\n",
"\n",
"`>90%` of cases when I need parallelization is just map, \n",
"and I don't need to think about exceptions raised (any exception is a reason for investigations). \n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Main backend is `loky` (copy-pasted description)\n",
"\n",
"- Consistent and robust spawn behavior: All processes are started using fork + exec on POSIX systems. This ensures safer interactions with third party libraries. On the contrary, multiprocessing.Pool uses fork without exec by default, causing third party runtimes to crash (e.g. OpenMP, macOS Accelerate...).\n",
"- Reusable executor: strategy to avoid re-spawning a complete executor every time. A singleton executor instance can be reused (and dynamically resized if necessary) across consecutive calls to limit spawning and shutdown overhead. The worker processes can be shutdown automatically after a configurable idling timeout to free system resources.\n",
"- Transparent cloudpickle integration: to call interactively defined functions and lambda expressions in parallel. It is also possible to register a custom pickler implementation to handle inter-process communications.\n",
"- No need for `if __name__ == \"__main__\"`: in scripts: thanks to the use of cloudpickle to call functions defined in the __main__ module, it is not required to protect the code calling parallel functions under Windows.\n",
"- Deadlock free implementation: one of the major concern in standard multiprocessing and concurrent.futures modules is the ability of the Pool/Executor to handle crashes of worker processes. This library intends to fix those possible deadlocks and send back meaningful errors. Note that the implementation of concurrent.futures.ProcessPoolExecutor that comes with Python 3.7+ is as robust as the executor from loky but the later also works for older versions of Python.\n"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"import joblib"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"# example when we want to use kwargs and args\n",
"result1 = [np.asarray(i, dtype='float32') for i in range(10)]\n",
"\n",
"delayed_asarray = joblib.delayed(np.asarray)\n",
"\n",
"with joblib.Parallel(n_jobs=2) as p: \n",
" result2 = p([delayed_asarray(i, dtype='float32') for i in range(10)])"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]"
]
},
"execution_count": 10,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# deals with lambda\n",
"with joblib.Parallel(n_jobs=2) as p: \n",
" results = p(joblib.delayed(lambda x: x)(i) for i in range(10))\n",
"results"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]\n"
]
}
],
"source": [
"# perfecctly works with locally defined functions, can extract parts of context\n",
"def make_computations():\n",
" some_variable = []\n",
" def process_chunk(chunk):\n",
" return chunk + len(some_variable)\n",
" with joblib.Parallel(n_jobs=2) as p: \n",
" results = p(joblib.delayed(process_chunk)(i) for i in range(10))\n",
" print(results)\n",
" \n",
"make_computations()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Other useful things:\n",
"\n",
"- verbosity level\n",
"- prefetching and iterators\n",
"- can pass large numpy objects to subprocesses by memmaping"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"large_array = np.zeros(1_000_000)"
]
},
{
"cell_type": "code",
"execution_count": 48,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"[Parallel(n_jobs=2)]: Using backend LokyBackend with 2 concurrent workers.\n",
"Memmapping (shape=(1000000,), dtype=float64) to new file /var/folders/m7/d0p8pv5s6x5g09z3rwkgp24m0000gn/T/joblib_memmapping_folder_7909_5835429116/7909-5053848936-74aa42aa909b4957be743fdb6739ad5f.pkl\n",
"Memmapping (shape=(1000000,), dtype=float64) to old file /var/folders/m7/d0p8pv5s6x5g09z3rwkgp24m0000gn/T/joblib_memmapping_folder_7909_5835429116/7909-5053848936-74aa42aa909b4957be743fdb6739ad5f.pkl\n",
"Memmapping (shape=(1000000,), dtype=float64) to old file /var/folders/m7/d0p8pv5s6x5g09z3rwkgp24m0000gn/T/joblib_memmapping_folder_7909_5835429116/7909-5053848936-74aa42aa909b4957be743fdb6739ad5f.pkl\n",
"Memmapping (shape=(1000000,), dtype=float64) to old file /var/folders/m7/d0p8pv5s6x5g09z3rwkgp24m0000gn/T/joblib_memmapping_folder_7909_5835429116/7909-5053848936-74aa42aa909b4957be743fdb6739ad5f.pkl\n",
"[Parallel(n_jobs=2)]: Done 1 tasks | elapsed: 0.3s\n",
"Memmapping (shape=(1000000,), dtype=float64) to old file /var/folders/m7/d0p8pv5s6x5g09z3rwkgp24m0000gn/T/joblib_memmapping_folder_7909_5835429116/7909-5053848936-74aa42aa909b4957be743fdb6739ad5f.pkl\n",
"[Parallel(n_jobs=2)]: Done 2 tasks | elapsed: 0.3s\n",
"Memmapping (shape=(1000000,), dtype=float64) to old file /var/folders/m7/d0p8pv5s6x5g09z3rwkgp24m0000gn/T/joblib_memmapping_folder_7909_5835429116/7909-5053848936-74aa42aa909b4957be743fdb6739ad5f.pkl\n",
"[Parallel(n_jobs=2)]: Done 3 tasks | elapsed: 0.3s\n",
"Memmapping (shape=(1000000,), dtype=float64) to old file /var/folders/m7/d0p8pv5s6x5g09z3rwkgp24m0000gn/T/joblib_memmapping_folder_7909_5835429116/7909-5053848936-74aa42aa909b4957be743fdb6739ad5f.pkl\n",
"[Parallel(n_jobs=2)]: Done 4 tasks | elapsed: 0.3s\n",
"[Parallel(n_jobs=2)]: Done 5 tasks | elapsed: 0.3s\n",
"Memmapping (shape=(1000000,), dtype=float64) to old file /var/folders/m7/d0p8pv5s6x5g09z3rwkgp24m0000gn/T/joblib_memmapping_folder_7909_5835429116/7909-5053848936-74aa42aa909b4957be743fdb6739ad5f.pkl\n",
"Memmapping (shape=(1000000,), dtype=float64) to old file /var/folders/m7/d0p8pv5s6x5g09z3rwkgp24m0000gn/T/joblib_memmapping_folder_7909_5835429116/7909-5053848936-74aa42aa909b4957be743fdb6739ad5f.pkl\n",
"[Parallel(n_jobs=2)]: Done 6 tasks | elapsed: 0.3s\n",
"[Parallel(n_jobs=2)]: Done 7 tasks | elapsed: 0.3s\n",
"[Parallel(n_jobs=2)]: Batch computation too fast (0.1725s.) Setting batch_size=2.\n",
"Memmapping (shape=(1000000,), dtype=float64) to old file /var/folders/m7/d0p8pv5s6x5g09z3rwkgp24m0000gn/T/joblib_memmapping_folder_7909_5835429116/7909-5053848936-74aa42aa909b4957be743fdb6739ad5f.pkl\n",
"[Parallel(n_jobs=2)]: Done 8 out of 10 | elapsed: 0.3s remaining: 0.1s\n",
"[Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 0.3s remaining: 0.0s\n",
"[Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 0.3s finished\n"
]
}
],
"source": [
"with joblib.Parallel(n_jobs=2, verbose=100) as p: \n",
" results = p(joblib.delayed(lambda x: large_array[x].sum())(i) for i in range(10))"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[numpy.memmap, numpy.memmap]"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"with joblib.Parallel(n_jobs=2) as p: \n",
" results = p(joblib.delayed(lambda x: type(large_array))(i) for i in range(2))\n",
"results "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### hypothesis for Jordan's question about passing fragments to processes"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"from copy import deepcopy\n",
"x = np.arange(100)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 22,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"x.flags['OWNDATA']"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"False"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"x[20:30].flags['OWNDATA']"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"False"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"x[:].flags['OWNDATA']"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"True"
]
},
"execution_count": 15,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"y = deepcopy(x[20:30])\n",
"y.flags['OWNDATA']"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"[True, True]"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"with joblib.Parallel(n_jobs=2) as p:\n",
" result = p(joblib.delayed(lambda x: x.flags['OWNDATA'])(y) for y in [x[10:20], x[20:30]])\n",
"result"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Summary on joblib\n",
"\n",
"- can practically do one thing, but is very convenient for it"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Ok, that's not a problem, only necessary part of an array is passed"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment