Skip to content

Instantly share code, notes, and snippets.

@FanchenBao
Last active October 24, 2023 04:20
Show Gist options
  • Star 11 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FanchenBao/d8577599c46eab1238a81857bb7277c9 to your computer and use it in GitHub Desktop.
Save FanchenBao/d8577599c46eab1238a81857bb7277c9 to your computer and use it in GitHub Desktop.
A custom MyQueue class that avoids `NotImplementedError` when calling queue.qsize() method.
from multiprocessing.queues import Queue
import multiprocessing
# The following implementation of custom MyQueue to avoid NotImplementedError
# when calling queue.qsize() in MacOS X comes almost entirely from this github
# discussion: https://github.com/keras-team/autokeras/issues/368
# Necessary modification is made to make the code compatible with Python3.
class SharedCounter(object):
""" A synchronized shared counter.
The locking done by multiprocessing.Value ensures that only a single
process or thread may read or write the in-memory ctypes object. However,
in order to do n += 1, Python performs a read followed by a write, so a
second process may read the old value before the new one is written by the
first process. The solution is to use a multiprocessing.Lock to guarantee
the atomicity of the modifications to Value.
This class comes almost entirely from Eli Bendersky's blog:
http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/
"""
def __init__(self, n=0):
self.count = multiprocessing.Value('i', n)
def increment(self, n=1):
""" Increment the counter by n (default = 1) """
with self.count.get_lock():
self.count.value += n
@property
def value(self):
""" Return the value of the counter """
return self.count.value
class MyQueue(Queue):
""" A portable implementation of multiprocessing.Queue.
Because of multithreading / multiprocessing semantics, Queue.qsize() may
raise the NotImplementedError exception on Unix platforms like Mac OS X
where sem_getvalue() is not implemented. This subclass addresses this
problem by using a synchronized shared counter (initialized to zero) and
increasing / decreasing its value every time the put() and get() methods
are called, respectively. This not only prevents NotImplementedError from
being raised, but also allows us to implement a reliable version of both
qsize() and empty().
Note the implementation of __getstate__ and __setstate__ which help to
serialize MyQueue when it is passed between processes. If these functions
are not defined, MyQueue cannot be serialized, which will lead to the error
of "AttributeError: 'MyQueue' object has no attribute 'size'".
See the answer provided here: https://stackoverflow.com/a/65513291/9723036
For documentation of using __getstate__ and __setstate__ to serialize objects,
refer to here: https://docs.python.org/3/library/pickle.html#pickling-class-instances
"""
def __init__(self):
super().__init__(ctx=multiprocessing.get_context())
self.size = SharedCounter(0)
def __getstate__(self):
"""Help to make MyQueue instance serializable.
Note that we record the parent class state, which is the state of the
actual queue, and the size of the queue, which is the state of MyQueue.
self.size is a SharedCounter instance. It is itself serializable.
"""
return {
'parent_state': super().__getstate__(),
'size': self.size,
}
def __setstate__(self, state):
super().__setstate__(state['parent_state'])
self.size = state['size']
def put(self, *args, **kwargs):
super().put(*args, **kwargs)
self.size.increment(1)
def get(self, *args, **kwargs):
item = super().get(*args, **kwargs)
self.size.increment(-1)
return item
def qsize(self):
""" Reliable implementation of multiprocessing.Queue.qsize() """
return self.size.value
def empty(self):
""" Reliable implementation of multiprocessing.Queue.empty() """
return not self.qsize()
from my_queue import MyQueue
from multiprocessing import Process
from time import sleep
from random import randint
# A simple use case of the custom MyQueue that allows .qsize() method
# in MacOS X.
def foo(q):
i = 0
while True:
q.put(f'current i = {i}')
sleep(randint(0, 3))
i += 1
if __name__ == '__main__':
q: MyQueue = MyQueue()
p: Process = Process(target=foo, args=(q,))
p.start()
times = 0
while times < 5:
print(f'current qsize = {q.qsize()}')
if not q.empty():
print(f'qsize = {q.qsize()} before get')
print(f'Item got from queue: {q.get()}')
print(f'qsize = {q.qsize()} after get')
times += 1
sleep(randint(0, 3))
p.terminate()
p.join()
print(f'qsize = {q.qsize()} at the end')
@arturzangiev
Copy link

I get 'MyQueue' object has no attribute 'size'

@FanchenBao
Copy link
Author

You need to call MyQueue.qsize()

@arturzangiev
Copy link

arturzangiev commented Dec 1, 2020

I am trying to call it from inside of the process. For example:

class Consumer(Process):
    def __init__(self, queue):
        self.q = queue
        super(Consumer,self).__init__()

    def run(self):
        while True:
            if not self.q.empty():
                message = self.q.get()
                print(self.name, message, self.q.qsize())
            else:
                time.sleep(0.01)
        return


if __name__ == '__main__':
    processes = []

    q = MyQueue()

    p = Producer(q)
    processes.append(p)
    p.start()

    for i in range(1):
        c = Consumer(q)
        processes.append(c)
        c.start()

    for proc in processes:
        proc.join()

@FanchenBao
Copy link
Author

I tested the code myself, and you are correct, I got hit by the 'MyQueue' object has no attribute 'size' error as well. This must has something to do with the change in Python3.8+ where the default method for creating a process changes from "fork" to "spawn" (see doc), because when I ran the code under Python3.7, it still worked.

Therefore, a temporary work around is to place multiprocessing.set_start_method('fork') right below if __name__ == '__main__':. This forces Python3.8+ to use "fork" instead of "spawn" when creating child process, and your code shall work.

A more sustainable fix requires an understanding of why "spawn" fails to copy the self.size attribute. I do not have the answer, so I have asked for help on stackoverflow (see question).

Finally, thank you very much for reporting this error.

@arturzangiev
Copy link

Thanks. Just implemeted it and it works.

@FanchenBao
Copy link
Author

@arturzangiev This is an update to our question regarding why the original code doesn't work in Python 3.8+. My question on SO finally gets answered. The problem with the original code is that the MyQueue object cannot be properly serialized to pass between spawned processes (i.e. the size attribute is not serialized under the default setting). By adding __getstate__() and __setstate__() functions, we manually dictate how the MyQueue object shall be serialized and it can be used in spawned processes.

Forked processes apparently do not need to serialize the MyQueue object because it employs memory sharing between processes.

The code in my_queue.py has been updated, and it works now both in Python 3.7 and 3.8+. The hack solution of forcing multiprocessing.set_start_method('fork') is no longer needed.

@tortila
Copy link

tortila commented Jan 20, 2021

This is exactly what I was looking for, thank you for sharing @FanchenBao!

There's this one detail and I'm sure whoever wants to use that code will quickly find the solution, but I'll mention it anyway: the multiprocessing.Queue has an optional maxsize argument. So it should be added in MyQueue constructor and passed to super().__init__ as well in order to keep the interfaces the same.

@michael-manasian
Copy link

michael-manasian commented Mar 19, 2021

Hello. Your class's docstrings indicate that the SharedCounter approach not only eliminates the error, but also provides more robust method implementations. what does it mean? I want to use the following code:

from multiprocessing import Value, get_context
from multiprocessing.queues import Queue as MultiprocessingQueue
from typing import Any


class SynchronizedCounter:
    def __init__(self) -> None:
        self.count = Value('i', 0)

    def change_count(self, number: int) -> None:
        with self.count.get_lock():
            self.count.value += number

    @property
    def value(self) -> int:
        return self.count.value


class Queue(MultiprocessingQueue):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, ctx=get_context(), **kwargs)
        self._counter = SynchronizedCounter()

    def __getstate__(self) -> dict[str, Any]:
        return {
            'parent_state': super().__getstate__(),
            '_counter': self._counter
        }

    def __setstate__(self, state: dict) -> None:
        super().__setstate__(state['parent_state'])
        self._counter = state['_counter']

    def put(self, *args, **kwargs) -> None:
        self._counter.change_count(1)
        super().put(*args, **kwargs)

    def get(self, *args, **kwargs) -> Any:
        self._counter.change_count(-1)
        return super().get(*args, **kwargs)

    @property
    def count(self) -> int:
        return self._counter.value

    @property
    def is_empty(self) -> bool:
        return not self.count

without checking is sys.platform == 'darwin'.
I mean, I want to use this queue all the time because it allows methods to be referred to as properties.
Will my code look ridiculous?

@FanchenBao
Copy link
Author

@MikeMight What I mean by "robust method implementations" is that the vanilla python Queue class specifies that Queue.qsize() is approximate (see doc) and Queue.empty() also has no guarantee (see doc). However, the queue implementation here uses a separate mechanism to handle queue size, thus the return of qsize() and empty() should be guaranteed.

I tested your code by adapting it to the use case example, and it worked well. So I think your code is fine.

@michael-manasian
Copy link

@FanchenBao Thank you for your reply and for the code you provided.
I want to ask about one more point, which is not to use a SharedCounter, but to put multiprocessing.Value in the attributes of the queue itself. I have already tried to do this and the results were the same, but I would like to know what were you guided by when you moved the logic of counting objects into a separate class? Is there any advantage to this?
It is possible that there will be mistakes in my answers, i apologize for them, i do not know engl well.

@FanchenBao
Copy link
Author

FanchenBao commented Mar 20, 2021

@MikeMight You are very welcome.

Regarding your question, putting multiprocessing.Value in the custom Queue class does work. However, by doing so, the custom queue would have too much coupling with the shared counter. In other words, we are mixing two relatively independent concepts (queue and shared counter) together. This has three downsides.

  1. It complicates the logic for each concept (e.g. we will have to worry about whether the shared counter logic would break if we alter the features in the queue), which in turn makes testing and future code refactoring a giant pain.
  2. It discourages code re-use (the shared counter concept could be re-used elsewhere).
  3. It makes documentation more difficult.

To avoid the downsides mentioned above, I would prefer leaving the queue and the shared counter as two separate classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment