Skip to content

Instantly share code, notes, and snippets.

@kevinhikaruevans
Last active April 22, 2024 16:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kevinhikaruevans/8f21f48053de4d04d6cdb27521a5e532 to your computer and use it in GitHub Desktop.
Save kevinhikaruevans/8f21f48053de4d04d6cdb27521a5e532 to your computer and use it in GitHub Desktop.
Wait for one, cancel other `asyncio` tasks

Wait for one, cancel others

This is a little helper script that can be used to wait for a single asyncio task to finish, then cancel other waiting tasks.

Usage

My use-case for this is:

  • waiting on user input
  • ...while waiting on cancellation events input (via pub-sub)
  • ...while waiting on errors

when ANY of these events occurs, we should cancel the other running tasks.

The function takes in a list of asyncio.Tasks and returns a list of the respective results (or None, if the task was cancelled), i.e.

task_1_result, task_2_result, ... = await wait_for_one_cancel_others(
  task_1,
  task_2,
  ...
)
# `task_1_result` can be a value (if task 1 finishes) or `None` (if task 2 finishes)
# `task_2_result` can be None (if task 1 finishes) or a value (if task 2 finishes)
# ...and so forth

Examples

For an example:

  sensor_success, cancelled_success = await wait_for_one_cancel_others(
    asyncio.create_task(self.wait_for_all_sensors()),
    asyncio.create_task(self.wait_for_cancellation(next_order_item))
  )
  # Here, if we had a sensor input, `sensor_success` would return `True` and
  # `cancelled_success` would be `None`. 
  #
  # If there was a cancellation event, then instead `sensor_success` would be `None`
  # and `cancelled_success` would be `True`

or perhaps with a timeout:

  sensor_success, cancelled_success, timeout_success = await wait_for_one_cancel_others(
    asyncio.create_task(self.wait_for_all_sensors()),
    asyncio.create_task(self.wait_for_cancellation(next_order_item)),
    asyncio.create_task(asyncio.sleep(delay=10.0, result=True)) # NOTE: the `result=True` bit here, so the function returns True on timeout
  )
  
  # if the task times out, then sensor_success, cancelled_success would be None,
  # and timeout_success would be True

Here, we're waiting on some sensor input WHILE monitoring for a cancellation event (via NATS). When either one of these events occurs, it will cancel the other tasks passed into the function.

Important

It's important to note that, technically, two tasks can finish at the same time. This means you will get multiple non-null return values. You can mitigate this from occuring by ensuring a task does not return without being cancelled.

import asyncio
from typing import List, Optional, Set, TypeVar, Union
T = TypeVar('T')
def get_task_result_or_none(task: asyncio.Task[T], done: Set[asyncio.Task[T]], cancelled: Set[asyncio.Task[T]]) -> Optional[T]:
if task in done:
return task.result()
if task in cancelled:
return None
raise ValueError(f'Task {task.get_name()} not in `done` or `cancelled`')
async def wait_for_one_cancel_others(*tasks: asyncio.Task[T]) -> List[Optional[T]]:
"""Waits for one task to finish, then cancels the rest.
Note:
I *think* tasks could both finish at the same time. In this case, no tasks will be
cancelled and both will have their respective return values.
Example:
```python
sensor_success, cancelled_success = await wait_for_one_cancel_others(
asyncio.create_task(self.wait_for_all_sensors()),
asyncio.create_task(self.wait_for_cancellation(next_order_item))
)
```
Returns:
List[Optional[T]]: returns a list containing the return values of the tasks or None (if cancelled)
* Can return [None, None, ...] if the task itself was cancelled (shouldn't occur)
"""
try:
# Wait for any task to complete
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Cancel all other tasks that are still pending
for task in pending:
task.cancel()
return [get_task_result_or_none(task, done, pending) for task in tasks]
except asyncio.CancelledError:
return [None] * len(tasks)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment