Skip to content

Instantly share code, notes, and snippets.

@rask
Last active April 24, 2018 15:56
Show Gist options
  • Save rask/1ec45c0dca341cfdeafc21d95409cf67 to your computer and use it in GitHub Desktop.
Save rask/1ec45c0dca341cfdeafc21d95409cf67 to your computer and use it in GitHub Desktop.
async compatible KeyedPool for Python
#!/usr/bin/env python
import asyncio
from keyed_pool import KeyedPool
async def produce(pool):
await pool.put('foo', 'bar')
await pool.put('hello', 'world')
await asyncio.sleep(4)
await pool.put('foo', 'baz')
async def consume(pool):
await asyncio.sleep(2)
fooval = await pool.get('foo')
helloval = await pool.get('hello')
await asyncio.sleep(3)
newfooval = await pool.get('foo')
print('{} {} {}'.format(fooval, helloval, newfooval))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
pool = KeyedPool(loop)
tasks = asyncio.gather(produce(pool), consume(pool))
loop.run_until_complete(tasks)
import asyncio
import collections
class KeyCollision(KeyError):
pass
class KeyedPool():
"""
A pool from which items can be awaited by key. References to asyncio.Queue
are prominent in this one.
"""
def __init__(self, loop: asyncio.AbstractEventLoop):
"""
Inits.
"""
self._max_size = 1024
self._loop = loop
self._pool = {}
self._getters = collections.deque()
self._putters = collections.deque()
def _do_next(self, waiters):
"""
Trigger an action for a collection of waiting Futures.
"""
while waiters:
waiter = waiters.popleft()
if not waiter.done():
waiter.set_result(None)
break
def has_key(self, key: str) -> bool:
"""
Check if a key is defined for this pool.
"""
return key in self._pool.keys()
def has_items(self) -> int:
"""
Check if the pool has any items.
"""
return bool(self._pool.keys())
def is_empty(self):
"""
Check if the pool is empty.
"""
return not self.has_items()
def is_full(self):
"""
Is the pool full?
"""
return self._max_size <= len(self._pool)
async def put(self, key: str, item):
"""
Put a new item into the pool.
"""
while self.is_full():
putter = self._loop.create_future()
self._putters.append(putter)
try:
await putter
except:
putter.cancel() # Just in case putter is not done yet.
try:
# Clean self._putters from canceled putters.
self._putters.remove(putter)
except ValueError:
# The putter could be removed from self._putters by a
# previous get_nowait call.
pass
if not self.is_full() and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
# the call. Wake up the next in line.
self._do_next(self._putters)
raise
if self.has_key(key):
raise KeyCollision()
self._put(key, item)
def _put(self, key: str, item):
"""
Actually put.
"""
self._pool[key] = item
self._do_next(self._getters)
async def get(self, key: str):
"""
Get an item from the pool by key.
"""
while self.is_empty() or not self.has_key(key):
getter = self._loop.create_future()
self._getters.append(getter)
try:
await getter
except:
getter.cancel() # Just in case getter is not done yet.
try:
# Clean self._getters from canceled getters.
self._getters.remove(getter)
except ValueError:
# The getter could be removed from self._getters by a
# previous put_nowait call.
pass
if not self.is_empty() and not getter.cancelled():
# We were woken up by put_nowait(), but can't take
# the call. Wake up the next in line.
self._do_next(self._getters)
raise
return self._get(key)
def _get(self, key: str):
"""
Actually get.
"""
p = self._pool
item = p.get(key)
del p[key]
self._pool = p
self._do_next(self._putters)
return item
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment