Skip to content

Instantly share code, notes, and snippets.

@internetimagery
Last active July 5, 2022 10:33
Show Gist options
  • Save internetimagery/6456eea3e239df4963752417afe509fb to your computer and use it in GitHub Desktop.
Save internetimagery/6456eea3e239df4963752417afe509fb to your computer and use it in GitHub Desktop.
Wrap Iterators so they can be sent to multiprocessing managers. For example SyncManager.Pool
# Permission to use, copy, modify, and/or distribute this software for any purpose with or without
# fee is hereby granted.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO
# THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE
# AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
# Pickle iterator for multiprocessing.Manager
import pickle
import threading
from six.moves.queue import Queue
from multiprocessing.managers import MakeProxyType
def register_senditer(manager_cls):
manager_cls.register("SendIter", SendIter, SendIterProxy)
class SendIter(object):
def __init__(self, size=0):
# Maintain queue entirely on manager side
self._queue = Queue(size)
def put(self, item):
self._queue.put(item)
def __iter__(self):
return self
def __next__(self):
item = self._queue.get()
if item is StopIteration:
raise StopIteration()
return item
next = __next__ # py2
class SendIterProxy(MakeProxyType("BaseSendIterProxy", ("__next__", "put"))):
def __iter__(self):
return self
def __call__(self, iterable):
# Proxy to handle passing data into the managers queue
def feed():
try:
for item in iterable:
self._callmethod("put", (item,))
finally:
self._callmethod("put", (StopIteration,))
threading.Thread(target=feed).start()
return self
# DEMO
from multiprocessing.managers import SyncManager
register_senditer(SyncManager)
def double(num):
for i in range(num):
yield i * 2
if __name__ == "__main__":
man = SyncManager()
man.start()
pool = man.Pool()
wrap = man.SendIter() # Wrap our iterator so we can send it to the manager
print(pool.map(str, wrap(double(5))))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment