Skip to content

Instantly share code, notes, and snippets.

@fedej
Last active August 4, 2021 00:54
Show Gist options
  • Save fedej/8957769ff3db3b377d06f4bb74bcef77 to your computer and use it in GitHub Desktop.
Save fedej/8957769ff3db3b377d06f4bb74bcef77 to your computer and use it in GitHub Desktop.
Context var isolation test
import asyncio
import time
from contextvars import ContextVar
from typing import Optional
TOKEN: ContextVar[Optional[str]] = ContextVar("token", default=None)
class Lock:
def __init__(self, name):
self.name = name
TOKEN.set(None)
self.expired = asyncio.Event()
async def acquire(self, token: Optional[str] = None, acquire: bool = False):
loop = asyncio.get_event_loop()
stop_trying_at = loop.time() + 10
task = asyncio.current_task()
while True:
if self.expired.is_set() or acquire:
print(f"{time.time()}: {task.get_name()} acquired lock {self.name}")
print(f"{time.time()}: Token was {TOKEN.get()} in {task.get_name()}")
TOKEN.set(token)
print(f"{time.time()}: {task.get_name()} sets token to {TOKEN.get()}")
return True
next_try_at = loop.time() + 1
if stop_trying_at is not None and next_try_at > stop_trying_at:
return False
print(f"{time.time()}: {task.get_name()} blocks trying to acquire {self.name}")
await asyncio.sleep(1)
def release(self) -> bool:
task = asyncio.current_task()
expected_token = TOKEN.get()
if expected_token is None:
raise RuntimeError("Cannot release an unlocked lock")
print(f"{time.time()}: {task.get_name()} releasing {expected_token}")
TOKEN.set(None)
return self.expired.is_set()
async def do_with_lock_task2(lock: Lock):
task = asyncio.current_task()
task3 = asyncio.create_task(do_with_lock_task3(lock))
if await lock.acquire("abc", True):
await asyncio.sleep(1)
lock.expired.set()
print(f"{time.time()}: {task.get_name()} has not completed yet, redis expires lock key")
await asyncio.sleep(2)
lock.release()
else:
raise RuntimeError(f"{task.get_name()} error")
await asyncio.gather(task3)
async def do_with_lock_task3(lock: Lock):
task = asyncio.current_task()
await lock.expired.wait()
if await lock.acquire("xyz"):
await asyncio.sleep(5)
lock.release()
else:
raise RuntimeError(f"{task.get_name()} error")
async def main():
### OK ###
lock = Lock("my-lock")
task2 = asyncio.create_task(do_with_lock_task2(lock))
await asyncio.gather(task2)
if __name__ == "__main__":
asyncio.run(main())
"""
1628038392.1417608: Task-2 acquired lock my-lock
1628038392.1417918: Token was None in Task-2
1628038392.1418037: Task-2 sets token to abc
1628038393.1431952: Task-2 has not completed yet, redis expires lock key
1628038393.1433778: Task-3 acquired lock my-lock
1628038393.1434047: Token was None in Task-3 < Context from task 2 wasn't copied to inner task
1628038393.14343: Task-3 sets token to xyz
1628038395.1458466: Task-2 releasing abc
1628038398.147317: Task-3 releasing xyz
"""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment