Skip to content

Instantly share code, notes, and snippets.

@vfreex
Created September 13, 2023 11:44
Show Gist options
  • Save vfreex/bd89622de7a9bd528c3f2c6720415d13 to your computer and use it in GitHub Desktop.
Save vfreex/bd89622de7a9bd528c3f2c6720415d13 to your computer and use it in GitHub Desktop.
redis_lock_with_auto_extend.py
import asyncio
import os
import redis.asyncio as redis
from redis.asyncio.lock import Lock
from redis.asyncio.retry import Retry
from redis.backoff import ExponentialBackoff
async def long_running_task():
print("Job is running...")
await asyncio.sleep(60)
async def auto_extend_lock(lock: Lock, when: float, new: float):
try:
while await lock.owned():
await asyncio.sleep(when)
print("Extending lock...")
await lock.extend(new, replace_ttl=True)
print("Lock extended")
except asyncio.CancelledError:
print('cancel_me(): cancel sleep')
raise
finally:
print('cancel_me(): after sleep')
async def main():
password = os.environ['REDIS_TOKEN']
retry = Retry(ExponentialBackoff(), 3)
try:
async with asyncio.timeout(120):
conn = redis.Redis(
host="master.redis.gwprhd.use1.cache.amazonaws.com", port=6379,
password=password, ssl=True, decode_responses=True, protocol=3,
retry=retry, retry_on_error=[redis.BusyLoadingError, redis.ConnectionError, redis.TimeoutError])
async with conn:
async with Lock(conn, "test-lock", blocking_timeout=3, timeout=10) as lock:
print("Lock acquired")
auto_extend_task = asyncio.create_task(auto_extend_lock(lock, when=10 * 0.6, new=10))
await long_running_task()
auto_extend_task.cancel()
except TimeoutError:
print("Job timed out")
finally:
print("Lock released")
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment