Skip to content

Instantly share code, notes, and snippets.

@nhumrich
Last active September 8, 2016 03:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nhumrich/dc9df57a24e9a75860a5b4853a4b7dc9 to your computer and use it in GitHub Desktop.
Save nhumrich/dc9df57a24e9a75860a5b4853a4b7dc9 to your computer and use it in GitHub Desktop.
python asyncio drain feature patch
diff -r c2212d98ef13 Lib/asyncio/queues.py
--- a/Lib/asyncio/queues.py Wed Sep 07 14:56:15 2016 -0700
+++ b/Lib/asyncio/queues.py Wed Sep 07 21:56:33 2016 -0600
@@ -1,13 +1,13 @@
"""Queues"""
-__all__ = ['Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty']
+__all__ = ['Queue', 'PriorityQueue', 'LifoQueue',
+ 'QueueFull', 'QueueEmpty', 'QueueClosed']
import collections
import heapq
from . import compat
from . import events
-from . import futures
from . import locks
from .coroutines import coroutine
@@ -26,6 +26,11 @@
pass
+class QueueClosed(Exception):
+ """Exception raised by Queue.get() and Queue.put()
+ when Queue.close() or Queue.drain() is called"""
+
+
class Queue:
"""A queue, useful for coordinating producer and consumer coroutines.
@@ -47,12 +52,15 @@
# Futures.
self._getters = collections.deque()
- # Futures.
self._putters = collections.deque()
+ self._drainer = None
+
self._unfinished_tasks = 0
self._finished = locks.Event(loop=self._loop)
self._finished.set()
self._init(maxsize)
+ self.is_closed = False
+ self.is_draining = False
# These three are overridable in subclasses.
@@ -75,6 +83,12 @@
waiter.set_result(None)
break
+ def _wakeup_all(self):
+ # wake up all waiters (if any) that aren't cancelled.
+ for waiters in (self._putters, self._getters):
+ while waiters:
+ self._wakeup_next(waiters)
+
def __repr__(self):
return '<{} at {:#x} {}>'.format(
type(self).__name__, id(self), self._format())
@@ -128,6 +142,8 @@
This method is a coroutine.
"""
while self.full():
+ if self.is_closed or self.is_draining:
+ raise QueueClosed
putter = self._loop.create_future()
self._putters.append(putter)
try:
@@ -146,6 +162,8 @@
If no free slot is immediately available, raise QueueFull.
"""
+ if self.is_draining or self.is_closed:
+ raise QueueClosed
if self.full():
raise QueueFull
self._put(item)
@@ -162,6 +180,16 @@
This method is a coroutine.
"""
while self.empty():
+ if self.is_closed:
+ raise QueueClosed
+ if self.is_draining:
+ if self._drainer:
+ self._drainer.set_result(None)
+ self._drainer = None
+ self.is_draining = False
+ self.is_closed = True
+ raise QueueClosed
+
getter = self._loop.create_future()
self._getters.append(getter)
try:
@@ -218,6 +246,45 @@
if self._unfinished_tasks > 0:
yield from self._finished.wait()
+ @coroutine
+ def drain(self):
+ """ Closes the queue and lets the queue drain.
+ Waits until queue is empty before returning.
+
+ Any following calls to Queue.put() or Queue.put_nowait() will raise
+ a QueueClosed Exception. Following calls to Queue.get() will succeed
+ until the queue is empty. Once the queue is empty, Queue.get() will
+ raise a QueueClosed exception.
+
+ Raises QueueClosed if the queue is already being drained or is closed.
+ """
+ if self.is_draining:
+ raise QueueClosed
+ self.drain_nowait()
+ yield from self.join()
+
+ def drain_nowait(self):
+ """Closes the queue and lets the queue drain.
+ Does not wait for the queue to be drained before returning.
+ """
+ if self.empty():
+ self.is_draining = False
+ self.is_closed = True
+ else:
+ self.is_draining = True
+
+ self._wakeup_all()
+
+ def close(self):
+ """ Closes the queue immediately, preventing all puts or gets.
+
+ Any call to Queue.get(), Queue.put(), or Queue.put_nowait() will
+ raise a QueueClosed exception.
+ """
+ self.is_closed = True
+ self.is_draining = False
+ self._wakeup_all()
+
class PriorityQueue(Queue):
"""A subclass of Queue; retrieves entries in priority order (lowest first).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment