Skip to content

Instantly share code, notes, and snippets.

@izxxr
Last active June 12, 2024 04:28
Show Gist options
  • Save izxxr/2ecdeabbef9215a65deef28fc7d642f4 to your computer and use it in GitHub Desktop.
Save izxxr/2ecdeabbef9215a65deef28fc7d642f4 to your computer and use it in GitHub Desktop.
Python async I/O Locks

Python async I/O Locks

Locks are used to ensure exclusive access to a shared resource.

This statement might sound vague at first sight but this short gist outlines the working of asyncio.Lock and how you can apply it on your use cases.

Introduction

Locks solve the problem of having to deal with concurrent tasks accessing a resource that you don't want to be accessed concurrently.

In many cases, you might have a specific function that does something heavy in background or asynchronously. The problem here is that you don't want this function to be called concurrently or in other words, you don't want the task it is doing to happen concurrently. Instead, you want each of the task to happen once at a time.

A practical example is querying a database. You don't want multiple users writing to a database at the same time. Instead, you want them to write to database once at a time. When a user is performing write on a database, you want other users to wait and once the previous user is done, the next waiter should perform it's writes. Locks allow you to easily handle this situation.

Example

Let's say we have a method that performs some heavy task in background such as writing to database. In an async enivornment, This method may be called concurrently by many users. However, due to nature of the task, you might not want it to be called concurrently as described in above example. Consider this code snippet:

import asyncio


async def write_to_database(id: int):
    for i in range(5):
        print(f"User #{id} writing to database. | Write: {i}/4")
        await asyncio.sleep(1)


async def main():
    tasks = []

    for user in range(5):
        t = asyncio.create_task(write_to_database(user))
        tasks.append(t)
        await asyncio.sleep(1)

    await asyncio.gather(*tasks)
        

if __name__ == "__main__":
    asyncio.run(main())

The output of this is roughly this:

User #0 writing to database | Write: 0/4
User #0 writing to database | Write: 1/4
User #1 writing to database | Write: 0/4
User #0 writing to database | Write: 2/4
User #1 writing to database | Write: 1/4
User #2 writing to database | Write: 0/4
User #1 writing to database | Write: 2/4
User #0 writing to database | Write: 3/4
User #2 writing to database | Write: 1/4
User #3 writing to database | Write: 0/4
User #0 writing to database | Write: 4/4
User #3 writing to database | Write: 1/4
User #1 writing to database | Write: 3/4
User #2 writing to database | Write: 2/4
User #4 writing to database | Write: 0/4
User #3 writing to database | Write: 2/4
User #4 writing to database | Write: 1/4
User #2 writing to database | Write: 3/4
User #1 writing to database | Write: 4/4
User #3 writing to database | Write: 3/4
User #2 writing to database | Write: 4/4
User #4 writing to database | Write: 2/4
User #3 writing to database | Write: 4/4
User #4 writing to database | Write: 3/4
User #4 writing to database | Write: 4/4

As you can see, the writes are not being performed in a specific order. The users are not waiting for the previous user to finish it operations. This may be problematic as you may not want this much load on the database.

One common workaround that many would think about for dealing with this is setting a flag to True in heavy_task and not spawning task until the flag isn't set to False by the previous task. This is a hacky approach and has many problems such as race conditions and overall speed of the task being slowed.

Locks are perfect here! Lets' modify the above example a bit to utilise locks:

import asyncio


async def write_to_db(id: int, lock: asyncio.Lock):
    await lock.acquire()  # Acquire the lock by this task and don't release it until the job is done.
    
    try:
        for i in range(5):
            print(f"User #{id} writing to database. | Write: {i}/4")
            await asyncio.sleep(1)
    finally:
        lock.release()  # Notify the next waiter to start it's job

async def main():
    tasks = []
    lock = asyncio.Lock()

    for user in range(5):
        t = asyncio.create_task(write_to_db(user, lock))
        tasks.append(t)
        await asyncio.sleep(1)

    await asyncio.gather(*tasks)
        

if __name__ == "__main__":
    asyncio.run(main())

Now examine the output:

User #0 is writing to database | Write: 0/4
User #0 is writing to database | Write: 1/4
User #0 is writing to database | Write: 2/4
User #0 is writing to database | Write: 3/4
User #0 is writing to database | Write: 4/4
User #1 is writing to database | Write: 0/4
User #1 is writing to database | Write: 1/4
User #1 is writing to database | Write: 2/4
User #1 is writing to database | Write: 3/4
User #1 is writing to database | Write: 4/4
User #2 is writing to database | Write: 0/4
User #2 is writing to database | Write: 1/4
User #2 is writing to database | Write: 2/4
User #2 is writing to database | Write: 3/4
User #2 is writing to database | Write: 4/4
User #3 is writing to database | Write: 0/4
User #3 is writing to database | Write: 1/4
User #3 is writing to database | Write: 2/4
User #3 is writing to database | Write: 3/4
User #3 is writing to database | Write: 4/4
User #4 is writing to database | Write: 0/4
User #4 is writing to database | Write: 1/4
User #4 is writing to database | Write: 2/4
User #4 is writing to database | Write: 3/4
User #4 is writing to database | Write: 4/4

As you may notice, the tasks are being done in the proper order as they were spawned and no concurrent writes are being performed.

How it works?

Now you may wonder what really happened as the code example isn't very obvious at first glance. Let's explain this.

  • Lock.acquire() acquires the lock and tells waiters to wait until the lock is not released. If the acquire() method is called by another waiter, It would block until the lock is not released.
  • Lock.release() releases the lock allowing the next waiter to acquire the lock.

When task #1 started, It acquired the lock by calling acquire() and started doing it's job. While it is performing it's job, The lock is under it's ownership and other waiters can't acquire it. Task #2 started and when it attempted to acquire the lock, It was put into waiting queue because lock is already acquired by Task #1.

In the meantime, Task #1 finishes it's job and once it's done, It calls release(), This release() call removes the ownership of lock from Task #1 and passes the ownership to next waiter, which is Task #2. Task #2, which was blocked by acquire() call is released and it starts performing it's job.

This process continues and all tasks one by one perform their job.

threading.Lock

asyncio.Lock is an async I/O based implementation of threading.Lock. The latter is suitable for multi-threaded applications. Both counterparts are completely compatible (have the same methods naming)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment