Skip to content

Instantly share code, notes, and snippets.

@Cyber-Neuron
Created March 19, 2017 16:37
Show Gist options
  • Save Cyber-Neuron/1342e0bdf854a375c1cafefadf5248f1 to your computer and use it in GitHub Desktop.
Save Cyber-Neuron/1342e0bdf854a375c1cafefadf5248f1 to your computer and use it in GitHub Desktop.
Using iterators and generators in multi-threaded applications
Using iterators and generators in multi-threaded applications
24 May 2012 – Bangalore
Python iterators and generators have almost the same behavior, but there are subtle differences, especially when the iterator/generator is used in a multi-threaded application.
Here is an example to demonstrate that behavior.
import threading
def count():
i = 0
while True:
i += 1
yield i
class Counter:
def __init__(self):
self.i = 0
def __iter__(self):
return self
def next(self):
self.i += 1
return self.i
def loop(func, n):
"""Runs the given function n times in a loop.
"""
for i in range(n):
func()
def run(f, repeats=1000, nthreads=10):
"""Starts multiple threads to execute the given function multiple
times in each thread.
"""
# create threads
threads = [threading.Thread(target=loop, args=(f, repeats))
for i in range(nthreads)]
# start threads
for t in threads:
t.start()
# wait for threads to finish
for t in threads:
t.join()
def main():
c1 = count()
c2 = Counter()
# call c1.next 100K times in 2 different threads
run(c1.next, repeats=100000, nthreads=2)
print "c1", c1.next()
# call c2.next 100K times in 2 different threads
run(c2.next, repeats=100000, nthreads=2)
print "c2", c2.next()
if __name__ == "__main__":
main()
And here is the output.
Exception in thread Thread-2:
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 522, in __bootstrap_inner
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.6/lib/python2.6/threading.py", line 477, in run
self.__target(*self.__args, **self.__kwargs)
File "count.py", line 22, in loop
func()
ValueError: generator already executing
c1 112106
c2 158368
The generator case failed because generators can be shared between threads, but they cannot be resumed from two threads at the same time. It means two threads try to call next method on the generator at the same time, it will raise an exception.
In the iterator case, it only creates a race condition as multiple threads are trying to update self.i at the same time. That is the reason for seeing wrong output, and it will change everytime we run the program.This can be easily fixed by protecting that of code using a lock.
class Counter:
def __init__(self):
self.i = 0
# create a lock
self.lock = threading.Lock()
def __iter__(self):
return self
def next(self):
# acquire/release the lock when updating self.i
with self.lock:
self.i += 1
return self.i
If we run the program now, we’ll get the excpected value for c2.
$ python count.py
...
c2 200001
The similar approach won’t work for generators as we don’t have control over the calling of next method. Whatever changes we make to the generator function, multiple threads can still call the next method at the same time.
The only way to fix it is by wrapping it in an iterator and have a lock that allows only one thread to call next method of the generator.
class threadsafe_iter:
"""Takes an iterator/generator and makes it thread-safe by
serializing call to the `next` method of given iterator/generator.
"""
def __init__(self, it):
self.it = it
self.lock = threading.Lock()
def __iter__(self):
return self
def next(self):
with self.lock:
return self.it.next()
Now you can take any iterator or generator and make it thread-safe by wrapping it with threadsafe_iter.
# thread unsafe generator
c1 = count()
# now it is thread-safe
c1 = threadsafe_iter(c1)
This can be made still easier by writing a decorator.
def threadsafe_generator(f):
"""A decorator that takes a generator function and makes it thread-safe.
"""
def g(*a, **kw):
return threadsafe_iter(f(*a, **kw))
return g
Now we can use this decorator to make any generator thread-safe.
@threadsafe_generator
def count():
i = 0
while True:
i += 1
yield i
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment