Skip to content

Instantly share code, notes, and snippets.

@cderwin
Created September 26, 2016 19:14
Show Gist options
  • Save cderwin/2cda20e947de75b759699d291123e2cd to your computer and use it in GitHub Desktop.
Save cderwin/2cda20e947de75b759699d291123e2cd to your computer and use it in GitHub Desktop.
How to close a subscription and return it back to a pool with asyncio_redis
class MessageIterator:
'''
Async iterator for messages received.
Only necessary as a separate class since async generators won't be introduced until 3.6
'''
def __init__(self, redis_conn, channel_name):
self.redis_conn = redis_conn # redis_conn can also be a pool, they have the same api
self.channel_name = channel_name
self.subscriber = None
def __aiter__(self):
return self
async def __anext__(self):
if self.subscriber is None:
self.subscriber = await self.redis_conn.start_subscribe()
await self.subscriber.subscribe([self.channel_name])
message = await self.subscriber.next_message()
return message
async def close(self):
await self.subscriber.unsubscribe([self.channel_name])
# Now for some hackish stuff so we can reuse the connection later
conn = self.subscriber.protocol
assert not conn._pubsub_channels and not conn._pubsub_patterns, 'Cannot exit pubsub mode while subscribed to 1 or more channels'
conn._in_pubsub = False
conn._subscription = None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment