Skip to content

Instantly share code, notes, and snippets.

@ipid
Last active January 29, 2023 08:17
Show Gist options
  • Save ipid/76717f58d7b55a26fc9682fcff50cc41 to your computer and use it in GitHub Desktop.
Save ipid/76717f58d7b55a26fc9682fcff50cc41 to your computer and use it in GitHub Desktop.
A closeable Python asyncio queue implementation.
import asyncio
from asyncio import Future
from collections import deque, namedtuple
QueueGetResult = namedtuple('QueueGetResult', ['item', 'isOpening'])
class CloseableQueue:
def __init__(self):
# 当前正在等待结果的协程的 Future
self._pending: deque[Future] = deque()
# 实际存储元素的队列
self._queue = deque()
# 当前队列是否已经关闭
self._isOpening = True
async def get(self) -> tuple[object, bool]:
if len(self._queue) > 0:
return QueueGetResult(item=self._queue.popleft(), isOpening=True)
if not self._isOpening:
return QueueGetResult(item=None, isOpening=False)
future = asyncio.Future()
self._pending.append(future)
await future
# 反正只要有元素就返回,是没问题的
if len(self._queue) > 0:
return QueueGetResult(item=self._queue.popleft(), isOpening=True)
if self._isOpening:
raise AssertionError('什么鬼?不可能执行到这里!')
return QueueGetResult(item=None, isOpening=False)
async def put(self, elem):
if not self._isOpening:
raise RuntimeError('队列已经关闭,不能再添加元素')
self._queue.append(elem)
if len(self._pending) > 0:
self._pending.popleft().set_result(None)
def close(self):
self._isOpening = False
while len(self._pending) > 0:
self._pending.popleft().set_result(None)
def is_open(self):
return self._isOpening
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment