Skip to content

Instantly share code, notes, and snippets.

@FanchenBao
Last active October 24, 2023 04:20
Show Gist options
  • Save FanchenBao/d8577599c46eab1238a81857bb7277c9 to your computer and use it in GitHub Desktop.
Save FanchenBao/d8577599c46eab1238a81857bb7277c9 to your computer and use it in GitHub Desktop.
A custom MyQueue class that avoids `NotImplementedError` when calling queue.qsize() method.
from multiprocessing.queues import Queue
import multiprocessing
# The following implementation of custom MyQueue to avoid NotImplementedError
# when calling queue.qsize() in MacOS X comes almost entirely from this github
# discussion: https://github.com/keras-team/autokeras/issues/368
# Necessary modification is made to make the code compatible with Python3.
class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n=0):
self.count = multiprocessing.Value('i', n)
def increment(self, n=1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" Return the value of the counter """
return self.count.value
class MyQueue(Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
Note the implementation of __getstate__ and __setstate__ which help to
serialize MyQueue when it is passed between processes. If these functions
are not defined, MyQueue cannot be serialized, which will lead to the error
of "AttributeError: 'MyQueue' object has no attribute 'size'".
See the answer provided here: https://stackoverflow.com/a/65513291/9723036
For documentation of using __getstate__ and __setstate__ to serialize objects,
refer to here: https://docs.python.org/3/library/pickle.html#pickling-class-instances
"""
def __init__(self):
super().__init__(ctx=multiprocessing.get_context())
self.size = SharedCounter(0)
def __getstate__(self):
"""Help to make MyQueue instance serializable.
Note that we record the parent class state, which is the state of the
actual queue, and the size of the queue, which is the state of MyQueue.
self.size is a SharedCounter instance. It is itself serializable.
"""
return {
'parent_state': super().__getstate__(),
'size': self.size,
}
def __setstate__(self, state):
super().__setstate__(state['parent_state'])
self.size = state['size']
def put(self, *args, **kwargs):
super().put(*args, **kwargs)
self.size.increment(1)
def get(self, *args, **kwargs):
item = super().get(*args, **kwargs)
self.size.increment(-1)
return item
def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self.size.value
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()
from my_queue import MyQueue
from multiprocessing import Process
from time import sleep
from random import randint
# A simple use case of the custom MyQueue that allows .qsize() method
# in MacOS X.
def foo(q):
i = 0
while True:
q.put(f'current i = {i}')
sleep(randint(0, 3))
i += 1
if __name__ == '__main__':
q: MyQueue = MyQueue()
p: Process = Process(target=foo, args=(q,))
p.start()
times = 0
while times < 5:
print(f'current qsize = {q.qsize()}')
if not q.empty():
print(f'qsize = {q.qsize()} before get')
print(f'Item got from queue: {q.get()}')
print(f'qsize = {q.qsize()} after get')
times += 1
sleep(randint(0, 3))
p.terminate()
p.join()
print(f'qsize = {q.qsize()} at the end')
@michael-manasian
Copy link

@FanchenBao Thank you for your reply and for the code you provided.
I want to ask about one more point, which is not to use a SharedCounter, but to put multiprocessing.Value in the attributes of the queue itself. I have already tried to do this and the results were the same, but I would like to know what were you guided by when you moved the logic of counting objects into a separate class? Is there any advantage to this?
It is possible that there will be mistakes in my answers, i apologize for them, i do not know engl well.

@FanchenBao
Copy link
Author

FanchenBao commented Mar 20, 2021

@MikeMight You are very welcome.

Regarding your question, putting multiprocessing.Value in the custom Queue class does work. However, by doing so, the custom queue would have too much coupling with the shared counter. In other words, we are mixing two relatively independent concepts (queue and shared counter) together. This has three downsides.

  1. It complicates the logic for each concept (e.g. we will have to worry about whether the shared counter logic would break if we alter the features in the queue), which in turn makes testing and future code refactoring a giant pain.
  2. It discourages code re-use (the shared counter concept could be re-used elsewhere).
  3. It makes documentation more difficult.

To avoid the downsides mentioned above, I would prefer leaving the queue and the shared counter as two separate classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment