Created
December 7, 2016 13:50
-
-
Save bofm/01a340d24ddfa7dacf8fbdfe5edaec69 to your computer and use it in GitHub Desktop.
Enhanced Python queue with additional .getall(), .clear() and .close() methods.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
from queue import Empty, Full | |
class QueueClosed(Exception): | |
pass | |
class MyQueue(): | |
def __init__(self, maxsize=0): | |
self.maxsize = maxsize | |
self._init(maxsize) | |
self.mutex = threading.Lock() | |
self.not_empty = threading.Condition(self.mutex) | |
self.not_full = threading.Condition(self.mutex) | |
self.all_tasks_done = threading.Condition(self.mutex) | |
self.unfinished_tasks = 0 | |
self.closed = False | |
def task_done(self): | |
with self.all_tasks_done: | |
unfinished = self.unfinished_tasks - 1 | |
if unfinished <= 0: | |
if unfinished < 0: | |
raise ValueError('task_done() called too many times') | |
self.all_tasks_done.notify_all() | |
self.unfinished_tasks = unfinished | |
def join(self): | |
with self.all_tasks_done: | |
while self.unfinished_tasks and not self.closed: | |
self.all_tasks_done.wait() | |
def qsize(self): | |
with self.mutex: | |
return self._qsize() | |
def empty(self): | |
with self.mutex: | |
return not self._qsize() | |
def full(self): | |
with self.mutex: | |
return 0 < self.maxsize <= self._qsize() | |
def put_nowait(self, item): | |
return self.put(item, block=False) | |
def get_nowait(self): | |
return self.get(block=False) | |
def _init(self, maxsize): | |
self.queue = deque() | |
def _qsize(self): | |
return len(self.queue) | |
def _put(self, item): | |
self.queue.append(item) | |
def _get(self): | |
return self.queue.popleft() | |
def close(self): | |
with self.mutex: | |
self.closed = True | |
self.not_empty.notify_all() | |
self.not_full.notify_all() | |
self.all_tasks_done.notify_all() | |
def getall(self, block=True, timeout=None): | |
with self.not_empty: | |
if self.closed: | |
raise QueueClosed() | |
if not block: | |
if not self._qsize(): | |
raise Empty | |
elif timeout is None: | |
while not self._qsize() and not self.closed: | |
self.not_empty.wait() | |
elif timeout < 0: | |
raise ValueError("'timeout' must be a non-negative number") | |
else: | |
endtime = time() + timeout | |
while not self._qsize() and not self.closed: | |
remaining = endtime - time() | |
if remaining <= 0.0: | |
raise Empty | |
self.not_empty.wait(remaining) | |
if self.closed: | |
raise QueueClosed() | |
items = tuple(self.queue) | |
self.queue.clear() | |
self.not_full.notify() | |
return items | |
def clear(self): | |
with self.not_empty: | |
self.queue.clear() | |
self.not_full.notify() | |
def put(self, item, block=True, timeout=None): | |
with self.not_full: | |
if self.closed: | |
raise QueueClosed() | |
if self.maxsize > 0: | |
if not block: | |
if self._qsize() >= self.maxsize: | |
raise Full | |
elif timeout is None: | |
while self._qsize() >= self.maxsize and not self.closed: | |
self.not_full.wait() | |
elif timeout < 0: | |
raise ValueError("'timeout' must be a non-negative number") | |
else: | |
endtime = time() + timeout | |
while self._qsize() >= self.maxsize and not self.closed: | |
remaining = endtime - time() | |
if remaining <= 0.0: | |
raise Full | |
self.not_full.wait(remaining) | |
if self.closed: | |
raise QueueClosed() | |
self._put(item) | |
self.unfinished_tasks += 1 | |
self.not_empty.notify() | |
def get(self, block=True, timeout=None): | |
with self.not_empty: | |
if self.closed: | |
raise QueueClosed() | |
if not block: | |
if not self._qsize(): | |
raise Empty | |
elif timeout is None: | |
while not self._qsize() and not self.closed: | |
self.not_empty.wait() | |
elif timeout < 0: | |
raise ValueError("'timeout' must be a non-negative number") | |
else: | |
endtime = time() + timeout | |
while not self._qsize() and not self.closed: | |
remaining = endtime - time() | |
if remaining <= 0.0: | |
raise Empty | |
self.not_empty.wait(remaining) | |
if self.closed: | |
raise QueueClosed() | |
item = self._get() | |
self.not_full.notify() | |
return item |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment