Skip to content

Instantly share code, notes, and snippets.

@gnzsnz
Last active February 16, 2024 12:08
Show Gist options
  • Save gnzsnz/f50a04c4053a262b0670206c23ada1bb to your computer and use it in GitHub Desktop.
Save gnzsnz/f50a04c4053a262b0670206c23ada1bb to your computer and use it in GitHub Desktop.
Asyncio semaphores in python

How to use semaphores in asyncio

A short example showing asyncio semaphores

import asyncio
import time
from random import random

async def my_coroutine(sem,task_id):
    async with sem:
        # critical section of code
        print(f"Acquired id:{task_id}, time:{time.strftime('%H:%M:%S', time.localtime())}")
        _run_time = .5 + random()
        await asyncio.sleep(_run_time)
        print(f"Released id:{task_id}, time:{time.strftime('%H:%M:%S', time.localtime())}, {_run_time}")
    return _run_time


async def main():
    _start_time = time.time()
    print(f"Start time:{time.strftime('%H:%M:%S', time.localtime(_start_time))}")
    sem = asyncio.Semaphore(3)
    tasks = [asyncio.create_task(my_coroutine(sem,i)) for i in range(10)]
    result = await asyncio.gather(*tasks)
    _end_time = time.time()
    print(f"End time:{time.strftime('%H:%M:%S', time.localtime(_end_time))}")

    print(f"running time:{_end_time - _start_time}")
    _total=0
    for _r in result:
        _total+=_r
    print(f"executuion time: {_total}")

asyncio.run(main)

#if you are in jupyter/ipython
#await main()

output

Start time:09:54:51
Acquired id:0, time:09:54:51
Acquired id:1, time:09:54:51
Acquired id:2, time:09:54:51
Released id:0, time:09:54:51, 0.543004737902306
Acquired id:3, time:09:54:51
Released id:2, time:09:54:52, 0.716452786407052
Acquired id:4, time:09:54:52
Released id:1, time:09:54:52, 0.8981319786398339
Acquired id:5, time:09:54:52
Released id:3, time:09:54:52, 1.0064910961164992
Acquired id:6, time:09:54:52
Released id:4, time:09:54:53, 1.3037485263905855
Acquired id:7, time:09:54:53
Released id:5, time:09:54:53, 1.4120800095210995
Acquired id:8, time:09:54:53
Released id:6, time:09:54:54, 1.315146059710378
Acquired id:9, time:09:54:54
Released id:7, time:09:54:54, 1.2501999443690721
Released id:8, time:09:54:55, 1.3127928613465163
Released id:9, time:09:54:55, 0.8168799396808626
End time:09:54:55
running time:3.688829183578491
executuion time: 10.574927940084205

The semaphore is limiting the number of concurrent executions of my_corouting to 3. Then we launch 10 coroutines at the same time. Each coroutine will run for 0.5 + random() (between .5 and 1.5 seconds).

The total running time is 3.68, thanks to asynchronous parallelization. The combined running time of all coroutines is 10.57.

We can use asyncio.wait_for and asyncio.wait to modify the behabiour of the previous example

async def main_timeout():
    _start_time = time.time()
    print(f"Start time:{time.strftime('%H:%M:%S', time.localtime(_start_time))}")
    sem = asyncio.Semaphore(3)
    tasks = [asyncio.create_task(my_coroutine(sem,i)) for i in range(10)]
    
    result=[]
    for _t in tasks:
        try:
            _r = await asyncio.wait_for(_t, timeout=1)
        except TimeoutError as errn:
            print(f"task {_t} timeout {errn.args}")

    
    _end_time = time.time()
    print(f"End time:{time.strftime('%H:%M:%S', time.localtime(_end_time))}")

    print(f"running time:{_end_time - _start_time}")
    _total=0
    for _r in result:
        _total+=_r
    print(f"executuion time: {_total}")

async def main_wait():
    _start_time = time.time()
    print(f"Start time:{time.strftime('%H:%M:%S', time.localtime(_start_time))}")
    sem = asyncio.Semaphore(3)

    tasks=[]
    for i in range(10):
        tasks.append(asyncio.create_task(my_coroutine(sem,i)))
        _creation_time = time.time()
        _time =time.strftime('%H:%M:%S', time.localtime(_start_time))
        print(f"task id:{i} created, time:{_time}")
    
    result, pending = await asyncio.wait(tasks, timeout=3)
    
    _end_time = time.time()
    print(f"End time:{time.strftime('%H:%M:%S', time.localtime(_end_time))}")

    print(f"running time:{_end_time - _start_time}")
    _total=0
    for _r in result:
        if isinstance(_r,asyncio.Task):
            continue
        _total+=_r

    _total_pending=[]
    for _p in pending:
        _total_pending.append(_p.cancelled())
        
    print(f"executuion time: {_total}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment