Skip to content

Instantly share code, notes, and snippets.

@nkhitrov
Last active May 27, 2023 18:31
Show Gist options
  • Save nkhitrov/4f7aefe9548df56d5bdf7b724fc582b4 to your computer and use it in GitHub Desktop.
Save nkhitrov/4f7aefe9548df56d5bdf7b724fc582b4 to your computer and use it in GitHub Desktop.
ThreadPoolExecutor with contextvars and asyncio
import contextvars
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
current_user = contextvars.ContextVar("ID of current user")
shared_user = contextvars.ContextVar("ID of shared user")
def say_hello():
time.sleep(1)
return f"Hello, {current_user.get()}"
def offer_sympathies():
time.sleep(1)
return f"Very sorry, {shared_user.get()}"
def set_context(context):
print(context)
for var, value in context.items():
print(var, value)
var.set(value)
def main():
current_user.set("koby")
copy = contextvars.copy_context()
with ThreadPoolExecutor(initializer=set_context, initargs=(copy,)) as exec:
tasks = (say_hello, offer_sympathies)
futures = (exec.submit(task) for task in tasks)
for future in as_completed(futures):
print(future.result())
async def async_main():
current_user.set("bass")
copy = contextvars.copy_context()
with ThreadPoolExecutor(initializer=set_context, initargs=(copy,)) as exec:
tasks = (say_hello, offer_sympathies)
futures = (exec.submit(task) for task in tasks)
for future in as_completed(futures):
print(future.result())
def copy_on_submit_main():
current_user.set("bass")
copy = contextvars.copy_context()
with ThreadPoolExecutor() as exec:
tasks = (say_hello, offer_sympathies)
futures = (exec.submit(_copy_context_on_start(task, copy)) for task in tasks)
for future in as_completed(futures):
print(future.result())
def _copy_context_on_start(func, context):
def _override():
set_context(context)
return func()
return _override
shared_user.set("igor")
main()
import asyncio
asyncio.run(async_main())
copy_on_submit_main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment