Skip to content

Instantly share code, notes, and snippets.

@EdwinChan
Last active May 8, 2024 20:58
Show Gist options
  • Save EdwinChan/3c13d3a746bb3ec5082f to your computer and use it in GitHub Desktop.
Save EdwinChan/3c13d3a746bb3ec5082f to your computer and use it in GitHub Desktop.
Trick for using multiprocessing with nested functions and lambda expressions
import concurrent.futures
import multiprocessing
import sys
import uuid
def globalize(func):
def result(*args, **kwargs):
return func(*args, **kwargs)
result.__name__ = result.__qualname__ = uuid.uuid4().hex
setattr(sys.modules[result.__module__], result.__name__, result)
return result
def main():
@globalize
def func1(x):
return x
func2 = globalize(lambda x: x)
with multiprocessing.Pool() as pool:
print(pool.map(func1, range(10)))
print(pool.map(func2, range(10)))
with concurrent.futures.ThreadPoolExecutor() as executor:
print(list(executor.map(func1, range(10))))
print(list(executor.map(func2, range(10))))
if __name__ == '__main__':
main()
@komodovaran
Copy link

I must say, this fix works quite amazingly, even when one nested function uses another nested function!

@MaxLenormand
Copy link

This is pretty sweet!

I'm not quite sure what the globalize function does, would you mind giving a bit more detail of what's going on and why it works?
Thanks!

@EdwinChan
Copy link
Author

EdwinChan commented Feb 11, 2021

@MaxLenormand Thanks!

concurrent.futures and multiprocessing use pickle to package a function in order to send it to other processors, but this doesn't work for non-top-level functions. globalize effectively clones the function, gives the clone a unique name, and inserts the clone as a top-level function into the original function's module. The nice thing is that the clone retains the original function's context, allowing it to access variables that the original function can. It may be cumbersome to invoke the clone manually because of its funky UUID name, but pickle has no problem with that.

@MaxLenormand
Copy link

Very nice, I just learned a bunch of things about multiprocessing thanks!

@Antyos
Copy link

Antyos commented Mar 23, 2021

This is super cool and just the solution I need for a project I'm working on!

Have you tested it on Windows 10 / Python 3.9? I copied it as is, but was unable to get it to run. I got a bunch of these errors:

Process SpawnPoolWorker-1:
Traceback (most recent call last):
  File "%LOCALAPPDATA%\Programs\Python\Python39\lib\multiprocessing\process.py", line 315, in _bootstrap
    self.run()
  File "%LOCALAPPDATA%\Programs\Python\Python39\lib\multiprocessing\process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "%LOCALAPPDATA%\Programs\Python\Python39\lib\multiprocessing\pool.py", line 114, in worker
    task = get()
  File "%LOCALAPPDATA%\Programs\Python\Python39\lib\multiprocessing\queues.py", line 368, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute '9bd76f4f85eb4181828e6c8ddf699740' on <module '__mp_main__' from '%USERPROFILE%\\Documents\\Code\\nested.py'>

Note: I've subbed my username in the path for the windows environment variables.

I assume that error is because the @globalize decorator successfully made the proper UUIDs for the function, but Python was then unable to find the new functions. Unfortunately I don't understand enough of what's going on (or what needs to go on) to actually figure out what's wrong.

@EdwinChan
Copy link
Author

@Antyos I don't usually work with Windows, so take what I say below with a grain of salt.

It seems that, because a Windows process can't be forked, multiprocessing must be implemented differently. In particular, instead of each worker process having a copy-on-write view of the parent process, each child process imports the entire script from scratch. Consider

import multiprocessing
import sys
import uuid

def globalize(func):
  def result(*args, **kwargs):
    return func(*args, **kwargs)
  result.__name__ = result.__qualname__ = uuid.uuid4().hex
  setattr(sys.modules[result.__module__], result.__name__, result)
  return result

def func1(x):
  print(sorted(sys.modules['__main__'].__dict__))
  return x

func2 = globalize(func1)

if __name__ == '__main__':
  with multiprocessing.Pool(2) as pool:
    print(pool.map(func1, range(10)))

Here func1() allows us to spawn worker processes, but what we actually want to see is how the globalized func2() appears to each worker process. Running the script on Windows 10 with Portable Python 3.8.6 produces something like

['1f9b94a3e4324285afecabf9d606496c', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'func1', 'func2', 'globalize', 'multiprocessing', 'sys', 'uuid']
['1f9b94a3e4324285afecabf9d606496c', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'func1', 'func2', 'globalize', 'multiprocessing', 'sys', 'uuid']
['1f9b94a3e4324285afecabf9d606496c', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'func1', 'func2', 'globalize', 'multiprocessing', 'sys', 'uuid']
['05f6ae944a704d76a8774cb18475dd18', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'func1', 'func2', 'globalize', 'multiprocessing', 'sys', 'uuid']
['1f9b94a3e4324285afecabf9d606496c', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'func1', 'func2', 'globalize', 'multiprocessing', 'sys', 'uuid']
['05f6ae944a704d76a8774cb18475dd18', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'func1', 'func2', 'globalize', 'multiprocessing', 'sys', 'uuid']
['1f9b94a3e4324285afecabf9d606496c', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'func1', 'func2', 'globalize', 'multiprocessing', 'sys', 'uuid']
['05f6ae944a704d76a8774cb18475dd18', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'func1', 'func2', 'globalize', 'multiprocessing', 'sys', 'uuid']
['1f9b94a3e4324285afecabf9d606496c', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'func1', 'func2', 'globalize', 'multiprocessing', 'sys', 'uuid']
['05f6ae944a704d76a8774cb18475dd18', '__builtins__', '__cached__', '__doc__', '__file__', '__loader__', '__name__', '__package__', '__spec__', 'func1', 'func2', 'globalize', 'multiprocessing', 'sys', 'uuid']

Because the two worker processes import the script independently, there are two randomly generated names in the output; running the same script on Ubuntu yields only one randomly generated name. Another interesting thing of note is that the script fails immediately without the check __name__ == '__main__', suggesting that multiprocessing does import the script in the worker processes, and the check prevents multiprocessing.Pool() from being called in the worker processes themselves.

All this points to two changes that must be made to the original script: first, the name-mangling in globalize() must be deterministic, which may unfortunately lead to name collision down the road; second, the nested functions cannot be created in a function that is called only when __name__ == '__main__':

import concurrent.futures
import multiprocessing
import sys
import os.path

def globalize(func):
  def result(*args, **kwargs):
    return func(*args, **kwargs)
  result.__name__ = result.__qualname__ = (
    os.path.abspath(func.__code__.co_filename).replace('.', '') + '\0' +
    str(func.__code__.co_firstlineno))
  setattr(sys.modules[result.__module__], result.__name__, result)
  return result

def make_func():
  def func(x):
    return x
  return func

func1 = globalize(make_func())
func2 = globalize(lambda x: x)

if __name__ == '__main__':
  with multiprocessing.Pool() as pool:
    print(pool.map(func1, range(10)))
    print(pool.map(func2, range(10)))

  with concurrent.futures.ThreadPoolExecutor() as executor:
    print(list(executor.map(func1, range(10))))
    print(list(executor.map(func2, range(10))))

@Antyos
Copy link

Antyos commented Apr 2, 2021

@EdwinChan, Thanks for the fix! I vaguely understand how it works, though the more I delve into programming, the more I find Windows to be a pain for "reasons".

It's unfortunate that the nature of Windows puts a significant limitation on the usefulness of your globalize() decorator, but thankfully there's always WSL.

I wonder if it would be possible to create a local function in main(), then globalize it outside, e.g.

# This doesn't work as actual code, but maybe there's something here
def main():
  def func(x):
    return x

func1 = globalize(main.func)

Also, to address the possible namespace collisions, maybe you could use some hashing algorithm from hashlib like md5 or sha256.

@EdwinChan
Copy link
Author

@Antyos My pleasure!

Regarding your main() suggestion, you could do something like:

class main:
  def func(x):
    return x

However, there's no need to globalize(main.func): as long as the class main is visible when the script is imported as a module, multiprocessing and concurrent.futures can find its member main.func() just fine.

Regarding name collision, all lambda functions are called <lambda>, and we may also want to distinguish between lambda functions with the same code, so a smarter way of name mangling would be needed.

@jobh
Copy link

jobh commented Jun 15, 2021

This is a clever idea, but beware that the locals are captured and retained indefinitely by the module namespace, leading to a memory leak if it is called repeatedly. A variation that avoids the memory leak is to use a context manager rather than a decorator, like so:

from contextlib import contextmanager
import concurrent.futures
import multiprocessing
import sys
import uuid

@contextmanager
def globalized(func):
    namespace = sys.modules[func.__module__]
    name, qualname = func.__name__, func.__qualname__
    func.__name__ = func.__qualname__ = f'_{name}_{uuid.uuid4().hex}'
    setattr(namespace, func.__name__, func)
    try:
        yield
    finally:
        delattr(namespace, func.__name__)
        func.__name__, func.__qualname__ = name, qualname

def main():
  def func1(x):
    return x

  func2 = lambda x: x

  with globalized(func1), globalized(func2), multiprocessing.Pool() as pool:
    print(pool.map(func1, range(10)))
    print(pool.map(func2, range(10)))

  with concurrent.futures.ThreadPoolExecutor() as executor:
    print(list(executor.map(func1, range(10))))
    print(list(executor.map(func2, range(10))))

if __name__ == '__main__':
  main()

@pk1234dva
Copy link

pk1234dva commented Nov 30, 2021

@EdwinChan Thanks for sharing this. Sorry to bother but could you clarify a bit more why exactly this works? I'm not sure I get this right - are objects at the global level still pickled and unpickled, or does this solution rely on the fact that the process gets forked, and so it's directly accessible as a global object even in the forked process? (in unix systems)

It seems that as far as pickling functions goes, it's just the name that gets pickled, and so making the function global really just allows for that to happen - to pickle/unpickle the name. So it looks like this solution does heavily rely on the fact that, the function will exist in memory of the replicated process - due to either forking, or due to the fact that the function will deterministically recreated (windows). Do I get that right?

@EdwinChan
Copy link
Author

@pk1234dva As far as I understand it, multiprocessing pickles the function name so that the worker processes know where to start. The trick here simply automates the creation of wrappers that can be pickled. Nothing else is pickled, neither in the global scope nor in the local scope, so the module must still be available in some form for the program to work. Whether the worker processes inherit the module in memory as a result of a fork or import the module anew is mostly an implementation detail.

@pk1234dva
Copy link

@EdwinChan Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment