Skip to content

Instantly share code, notes, and snippets.

@everilae
Last active March 26, 2024 18:49
Show Gist options
  • Save everilae/9697228 to your computer and use it in GitHub Desktop.
Save everilae/9697228 to your computer and use it in GitHub Desktop.
Threaded generator wrapper for Python
# A simple generator wrapper, not sure if it's good for anything at all.
# With basic python threading
from threading import Thread
try:
from queue import Queue
except ImportError:
from Queue import Queue
# ... or use multiprocessing versions
# WARNING: use sentinel based on value, not identity
from multiprocessing import Process, Queue as MpQueue
class ThreadedGenerator(object):
"""
Generator that runs on a separate thread, returning values to calling
thread. Care must be taken that the iterator does not mutate any shared
variables referenced in the calling thread.
"""
def __init__(self, iterator,
sentinel=object(),
queue_maxsize=0,
daemon=False,
Thread=Thread,
Queue=Queue):
self._iterator = iterator
self._sentinel = sentinel
self._queue = Queue(maxsize=queue_maxsize)
self._thread = Thread(
name=repr(iterator),
target=self._run
)
self._thread.daemon = daemon
def __repr__(self):
return 'ThreadedGenerator({!r})'.format(self._iterator)
def _run(self):
try:
for value in self._iterator:
self._queue.put(value)
finally:
self._queue.put(self._sentinel)
def __iter__(self):
self._thread.start()
for value in iter(self._queue.get, self._sentinel):
yield value
self._thread.join()
@monkeytruffle
Copy link

monkeytruffle commented Mar 23, 2024

Hi @everilae - I know this gist is quite a few years old, but I still wanted to thank you for sharing it! I was working on something similar when I found this gist, and it was helpful to see what you had done. In case you are interested I have just posted mine here.

@Kaiserouo - I found your comments very useful. I was already keen to make things as watertight as possible, especially if consuming from the generator in a way that might not exhaust it. Because the code cannot know how many values will be consumed, the only way is to provide a close method - preferebly with the class acting as a context manager to invoke it.

My version would therefore write your example as

with ThreadedGenerator(range(1000000), maxq=1) as tg:
    print(list(zip(range(6), tg)))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment