Created
March 19, 2017 16:37
-
-
Save Cyber-Neuron/1342e0bdf854a375c1cafefadf5248f1 to your computer and use it in GitHub Desktop.
Using iterators and generators in multi-threaded applications
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Using iterators and generators in multi-threaded applications | |
24 May 2012 – Bangalore | |
Python iterators and generators have almost the same behavior, but there are subtle differences, especially when the iterator/generator is used in a multi-threaded application. | |
Here is an example to demonstrate that behavior. | |
import threading | |
def count(): | |
i = 0 | |
while True: | |
i += 1 | |
yield i | |
class Counter: | |
def __init__(self): | |
self.i = 0 | |
def __iter__(self): | |
return self | |
def next(self): | |
self.i += 1 | |
return self.i | |
def loop(func, n): | |
"""Runs the given function n times in a loop. | |
""" | |
for i in range(n): | |
func() | |
def run(f, repeats=1000, nthreads=10): | |
"""Starts multiple threads to execute the given function multiple | |
times in each thread. | |
""" | |
# create threads | |
threads = [threading.Thread(target=loop, args=(f, repeats)) | |
for i in range(nthreads)] | |
# start threads | |
for t in threads: | |
t.start() | |
# wait for threads to finish | |
for t in threads: | |
t.join() | |
def main(): | |
c1 = count() | |
c2 = Counter() | |
# call c1.next 100K times in 2 different threads | |
run(c1.next, repeats=100000, nthreads=2) | |
print "c1", c1.next() | |
# call c2.next 100K times in 2 different threads | |
run(c2.next, repeats=100000, nthreads=2) | |
print "c2", c2.next() | |
if __name__ == "__main__": | |
main() | |
And here is the output. | |
Exception in thread Thread-2: | |
Traceback (most recent call last): | |
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 522, in __bootstrap_inner | |
self.run() | |
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 477, in run | |
self.__target(*self.__args, **self.__kwargs) | |
File "count.py", line 22, in loop | |
func() | |
ValueError: generator already executing | |
c1 112106 | |
c2 158368 | |
The generator case failed because generators can be shared between threads, but they cannot be resumed from two threads at the same time. It means two threads try to call next method on the generator at the same time, it will raise an exception. | |
In the iterator case, it only creates a race condition as multiple threads are trying to update self.i at the same time. That is the reason for seeing wrong output, and it will change everytime we run the program.This can be easily fixed by protecting that of code using a lock. | |
class Counter: | |
def __init__(self): | |
self.i = 0 | |
# create a lock | |
self.lock = threading.Lock() | |
def __iter__(self): | |
return self | |
def next(self): | |
# acquire/release the lock when updating self.i | |
with self.lock: | |
self.i += 1 | |
return self.i | |
If we run the program now, we’ll get the excpected value for c2. | |
$ python count.py | |
... | |
c2 200001 | |
The similar approach won’t work for generators as we don’t have control over the calling of next method. Whatever changes we make to the generator function, multiple threads can still call the next method at the same time. | |
The only way to fix it is by wrapping it in an iterator and have a lock that allows only one thread to call next method of the generator. | |
class threadsafe_iter: | |
"""Takes an iterator/generator and makes it thread-safe by | |
serializing call to the `next` method of given iterator/generator. | |
""" | |
def __init__(self, it): | |
self.it = it | |
self.lock = threading.Lock() | |
def __iter__(self): | |
return self | |
def next(self): | |
with self.lock: | |
return self.it.next() | |
Now you can take any iterator or generator and make it thread-safe by wrapping it with threadsafe_iter. | |
# thread unsafe generator | |
c1 = count() | |
# now it is thread-safe | |
c1 = threadsafe_iter(c1) | |
This can be made still easier by writing a decorator. | |
def threadsafe_generator(f): | |
"""A decorator that takes a generator function and makes it thread-safe. | |
""" | |
def g(*a, **kw): | |
return threadsafe_iter(f(*a, **kw)) | |
return g | |
Now we can use this decorator to make any generator thread-safe. | |
@threadsafe_generator | |
def count(): | |
i = 0 | |
while True: | |
i += 1 | |
yield i |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment