Last active
July 5, 2022 10:33
-
-
Save internetimagery/6456eea3e239df4963752417afe509fb to your computer and use it in GitHub Desktop.
Wrap Iterators so they can be sent to multiprocessing managers. For example SyncManager.Pool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Permission to use, copy, modify, and/or distribute this software for any purpose with or without | |
# fee is hereby granted. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH REGARD TO | |
# THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE | |
# AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER | |
# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE | |
# OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
# Pickle iterator for multiprocessing.Manager | |
import pickle | |
import threading | |
from six.moves.queue import Queue | |
from multiprocessing.managers import MakeProxyType | |
def register_senditer(manager_cls): | |
manager_cls.register("SendIter", SendIter, SendIterProxy) | |
class SendIter(object): | |
def __init__(self, size=0): | |
# Maintain queue entirely on manager side | |
self._queue = Queue(size) | |
def put(self, item): | |
self._queue.put(item) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
item = self._queue.get() | |
if item is StopIteration: | |
raise StopIteration() | |
return item | |
next = __next__ # py2 | |
class SendIterProxy(MakeProxyType("BaseSendIterProxy", ("__next__", "put"))): | |
def __iter__(self): | |
return self | |
def __call__(self, iterable): | |
# Proxy to handle passing data into the managers queue | |
def feed(): | |
try: | |
for item in iterable: | |
self._callmethod("put", (item,)) | |
finally: | |
self._callmethod("put", (StopIteration,)) | |
threading.Thread(target=feed).start() | |
return self | |
# DEMO | |
from multiprocessing.managers import SyncManager | |
register_senditer(SyncManager) | |
def double(num): | |
for i in range(num): | |
yield i * 2 | |
if __name__ == "__main__": | |
man = SyncManager() | |
man.start() | |
pool = man.Pool() | |
wrap = man.SendIter() # Wrap our iterator so we can send it to the manager | |
print(pool.map(str, wrap(double(5)))) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment