Skip to content

Instantly share code, notes, and snippets.

@simonw
Created February 14, 2025 06:38

Detailed Analysis of Running SQLite Queries in an Asyncio Application

This analysis examines how the provided code—from a project such as Datasette or similar—is designed to run SQLite queries in an asyncio application. The design must reconcile the synchronous nature of SQLite (and its Python binding) with an asynchronous application that uses asyncio. We’ll break down the mechanics of the implementation, weigh its advantages and drawbacks, and then propose some alternative approaches.


1. How the Code Works

a. Dual-Mode Operation: Threaded vs. Non-Threaded

The Database class supports two execution “modes”:

  • Non-threaded mode: When self.ds.executor is None, the database methods use pre-established connections and run queries directly in the main thread (or current thread) while using asynchronous wrappers to keep the API consistent.
  • Threaded mode: When an executor is provided, the database read operations run on threads via asyncio.get_event_loop().run_in_executor(...) so that blocking database calls do not block the event loop.

b. Read Operations

  • Single Connection & Thread-local Reuse:
    The execute_fn method is used to run queries for reading. In non-threaded mode, it creates or reuses a “read connection” (stored as self._read_connection) and configures it (using self.ds._prepare_connection).
    In threaded mode, an inner function (using thread-local storage via threading.local()) is executed in a thread pool using run_in_executor. This ensures each thread running a query has its own connection to avoid cross-thread issues.

  • Time-limit Enforcement:
    The execute method wraps the actual query execution in the context manager sqlite_timelimit. This context manager employs SQLite’s progress handler (via conn.set_progress_handler) to interrupt long-running queries based on a configurable time limit.

c. Write Operations: Dedicated Write Thread with Queue

Since SQLite doesn’t support running multiple writes concurrently on a single connection, the code enforces serialization of writes by using a dedicated write thread:

  • Queueing Write Tasks:
    Methods like execute_write, execute_write_script, and execute_write_many call into execute_write_fn. When running in threaded mode, instead of applying the write immediately, the function is wrapped in a task (a WriteTask object) and put on a write queue (an instance of the standard queue.Queue).

  • Using Janus for Bridging Sync and Async:
    A janus.Queue is created for each write task as a “reply_queue.” Janus provides both synchronous and asynchronous interfaces to a queue, thereby allowing seamless communication between the synchronous write thread and asynchronous callers.

  • The Write Thread’s Loop:
    The dedicated write thread (started once and stored as self._write_thread) runs the _execute_writes method in an infinite loop. This loop:

    • Creates or reuses a “write connection” to the database by calling the connect(write=True) method.
    • Processes tasks one by one from the write queue.
    • For each task:
      • If isolated_connection is set, a new write connection is established just for that task, executed, and then closed.
      • Otherwise, the persistent write connection is used, and optionally a transaction is enforced using a context manager (with conn:).
    • Finally, the result or any exception is put onto the synchronous side of the janus queue (task.reply_queue.sync_q.put(result)), so the original asynchronous caller, awaiting reply_queue.async_q.get(), receives the response.

d. Summary of Mechanisms for Concurrency

  • Separation of Concerns:
    Read operations get their own thread-local connection and use a thread pool when in threaded mode.
    Write operations are consolidated through a dedicated thread using a simple queue, ensuring they do not run concurrently or conflict over the same connection.

  • Error Handling and Isolation:
    The mechanism supports “isolated” writes where a dedicated connection is opened for a particular task to avoid interference with an ongoing transaction on the persistent write connection.
    Errors encountered either in setting up the connection or during execution are packaged and returned via the queue so that the asynchronous caller can handle them.


2. Pros and Cons of This Design

Advantages

  • Safe Serialization of Writes:
    By funneling all write operations through a single dedicated thread and queue, the design avoids SQLite’s potential pitfalls when multiple threads try to write concurrently on the same connection.

  • Non-Blocking Async API:
    The use of run_in_executor for read operations and a dedicated thread for write operations means that blocking SQLite calls do not block the main asyncio event loop. This helps maintain responsiveness in the overall async application.

  • Flexible Mode Selection:
    The design supports a “non-threaded mode” (when no executor is provided) for simpler use cases, while scaling to threaded execution when higher concurrency is needed.

  • Progress Handler for Time-Limits:
    Using sqlite_timelimit with SQLite’s progress handler adds a layer of protection against runaway queries by interrupting them when they take too long.

Disadvantages

  • Increased Complexity:
    Managing two different modes (threaded vs. non-threaded), a dedicated write thread, and the bridging via janus for write requests introduces considerable complexity. The code must juggle multiple connections, queues, and error paths.

  • Resource Management Challenges:
    The write thread runs in an infinite loop and requires careful management of connections (e.g., cleaning up connections and the thread itself when shutting down). Without proper shutdown procedures, there is a risk of leaving lingering threads or open file handles.

  • Overhead of Thread Hopping:
    Although using an executor avoids blocking the event loop, every read query in threaded mode involves a context switch and potentially unnecessary overhead when many queries are lightweight.

  • Dependency on Third-Party Libraries:
    The use of the janus library (for bridging sync and async queues) and the manual management of threads add external dependencies that must be maintained and understood by future developers.

  • Potential for Suboptimal Concurrency:
    While write queries are serialized for safety, this can become a bottleneck if the application needs to handle many concurrent write requests. Also, using a separate isolated write connection for each “isolated” task could lead to frequent connection setup overhead.


3. Alternative Mechanisms That Might Work Better

Given the challenges of juggling blocking I/O with asyncio, consider these alternatives:

a. Use an Async-Aware SQLite Library (e.g., aiosqlite)

  • Description:
    Libraries like aiosqlite provide an asynchronous interface for SQLite by wrapping the synchronous SQLite API in an executor internally.
  • Benefits:
    This approach abstracts away much of the boilerplate of managing thread pools, dedicated write threads, and complex queueing logic. The API is cleaner and more “async/await friendly,” reducing the risk of subtle concurrency bugs.
  • Potential Downsides:
    While aiosqlite manages the thread pool internally, you still need to be aware of SQLite limitations (such as serialized writes). However, the library is designed for these scenarios and has already solved many of the common pitfalls.

b. Use a Connection Pooling Mechanism

  • Description:
    Instead of a dedicated write thread and thread-local connections for reads, a connection pool can be implemented where each async task obtains its own connection from the pool.
  • Benefits:
    A pool can simplify resource management and allow multiple connections to exist, which may better support a mix of read and write operations if properly configured (for example, using the :memory: URI or file-based locks).
  • Potential Downsides:
    Pool management introduces its own complexity, and ensuring that write operations remain serializable (or are safely concurrent) may still require additional locking mechanisms.

c. Offload All Database Operations to a Separate Process

  • Description:
    Another model is to run all SQLite queries in a separate process (or even microservice) and communicate with it via an IPC mechanism (such as a REST API, message queue, or RPC).
  • Benefits:
    This separates concerns entirely and prevents the blocking I/O of SQLite from ever affecting the asyncio event loop.
  • Potential Downsides:
    It introduces inter-process communication overhead and requires careful error handling and orchestration between processes. It may be overkill for many applications.

d. Build on Top of Higher-Level Abstractions

  • Description:
    Frameworks or ORMs that are designed for async operations (such as Tortoise ORM or SQLAlchemy 1.4+ with asyncio support) can handle many of these concurrency challenges internally.
  • Benefits:
    These libraries typically provide robust connection handling, pooling, and concurrency management that relieve application code from these lower-level details.
  • Potential Downsides:
    Integrating a higher-level framework might require significant refactoring if the application is already built around raw SQLite connections and custom query handling.

Conclusion

The presented code offers a solution that mediates between SQLite’s synchronous API and asyncio’s non-blocking design by:

  • Running read operations in an executor (or via thread-local connections),
  • Serializing writes through a dedicated thread and write queue with janus acting as a bridge.

While this design has notable benefits in terms of data integrity and event loop responsiveness, its complexity and potential performance bottlenecks have led to alternative approaches—such as adopting libraries like aiosqlite, connection pooling, or even offloading to separate processes—which could simplify the implementation and improve scalability.

By choosing an alternative that better matches the application’s expected load and operational context, developers can often reduce boilerplate code and reduce the risk of concurrency issues while still maintaining the benefits of asynchronous programming.

@simonw
Copy link
Author

simonw commented Feb 14, 2025

git clone https://github.com/simonw/datasette
cd datasette/datasette
files-to-prompt database.py utils/__init__.py -c | \
  llm -m o3-mini -o reasoning_effort high \
  -s 'Output in markdown a detailed analysis of how this code handles the challenge of running SQLite queries from a Python asyncio application. Explain how it works in the first section, then explore the pros and cons of this design. In a final section propose alternative mechanisms that might work better.'

@simonw
Copy link
Author

simonw commented Feb 14, 2025

llm -c 'discuss the thread pool idea in a lot more detail. Explain exactly how it would work, why it would be better than the current implementation and provide example Python code showing how it could be built'

Using a Thread Pool with Connection Pooling for SQLite in Asyncio

One alternative to the current approach—which uses separate read and write code paths along with a dedicated “write thread” and janus queues—is to use a thread‐pool based connection pool. In this design, all database operations (both reads and writes) are offloaded to a pool of worker threads. Each worker thread has its own SQLite connection drawn from a pool. This approach has several benefits and can simplify the orchestration of database I/O.


How It Would Work

The Basic Idea

  1. Connection Pooling:
    A pool of SQLite connections is created and maintained. The pool might have, for example, 5–10 connections. These are created at startup and then recycled between queries.

    • For read operations, different threads can borrow different connections concurrently.
    • For write operations, if needed, the pool can be configured with special handling to ensure that only one connection is used or that writes are serialized, but in many designs even writes can use separate connections if SQLite is set into its “serialized” threading mode (or if you use connection-level transactions).
  2. ThreadPool Execution:
    Instead of manually spinning up a dedicated thread for writes and using janus queues, all blocking database calls are submitted to a thread pool via the asyncio event loop (using run_in_executor).

    • Each call is submitted as a job to a ThreadPoolExecutor which then picks an available worker thread.
    • The worker thread retrieves a connection from the connection pool, runs the query synchronously, then returns the connection to the pool when finished.
  3. Integration With Asyncio:
    The submission and the awaiting of results are done entirely within the asyncio event loop. The blocking code is isolated in worker threads:

    • The main code never blocks because it awaits the Future returned by run_in_executor.
    • There is no need for a dedicated “write” thread with a custom queue since the thread pool handles job queuing automatically.

Why This Is Better

  • Reduced Complexity:
    The logic for managing a dedicated write thread, handling a custom janus queue, and choosing between “isolated” and persistent connections is replaced by standard connection–pool logic and an executor that is built into Python’s standard library.

  • Scalability and Flexibility:
    With a connection pool, you can configure the maximum number of concurrent connections. Read operations can run concurrently if your workload is read–intensive, and for many moderate workloads even writes can be managed concurrently (with SQLite running in serialized mode).

  • Better Resource Management:
    A dedicated pool makes it easier to ensure that connections are properly closed on shutdown. The executor’s thread pool manages the number of threads without having to write custom infinite loops.

  • Standard and Maintainable Code:
    Using the built-in ThreadPoolExecutor together with a common connection pool pattern tends to be more maintainable and easier for new developers to understand compared to custom thread and queue management.


Example Python Code

Below is an example of how one might build a connection pool backed by a ThreadPoolExecutor for SQLite in an asyncio application.

Step 1. Create a Connection Pool

We create a simple pool using the standard library’s Queue. On initialization, we create a fixed number of connections.

import sqlite3
import threading
from queue import Queue, Empty

class SQLiteConnectionPool:
    def __init__(self, db_path, pool_size=5, **connect_kwargs):
        self.db_path = db_path
        self.pool = Queue(maxsize=pool_size)
        self.pool_size = pool_size
        self.connect_kwargs = connect_kwargs
        self._initialize_pool()

    def _initialize_pool(self):
        for _ in range(self.pool_size):
            conn = sqlite3.connect(self.db_path, check_same_thread=False, **self.connect_kwargs)
            self.pool.put(conn)

    def get_connection(self, timeout=None):
        try:
            return self.pool.get(timeout=timeout)
        except Empty:
            raise RuntimeError("No available database connections in the pool.")

    def return_connection(self, conn):
        self.pool.put(conn)

    def close_all(self):
        while not self.pool.empty():
            conn = self.pool.get()
            conn.close()

Step 2. Create a Database Executor Using ThreadPoolExecutor

We wrap database operations in functions that run on worker threads. The functions always get a connection from the pool, run the SQL query, and then return the connection.

import asyncio
from concurrent.futures import ThreadPoolExecutor

class AsyncDB:
    def __init__(self, db_path, pool_size=5):
        self.loop = asyncio.get_event_loop()
        self.executor = ThreadPoolExecutor(max_workers=pool_size)
        self.pool = SQLiteConnectionPool(db_path, pool_size=pool_size)
        
    async def execute(self, sql, params=None, commit=False):
        """
        Execute an SQL statement using the thread pool.
        If commit is True, then wrap the operation in a transaction.
        """
        return await self.loop.run_in_executor(
            self.executor,
            self._execute_blocking,
            sql,
            params or (),
            commit
        )

    def _execute_blocking(self, sql, params, commit):
        conn = self.pool.get_connection()
        try:
            cur = conn.cursor()
            cur.execute(sql, params)
            if commit:
                conn.commit()
            else:
                # For read operations, we fetch all rows
                result = cur.fetchall()
                return result
        except Exception as e:
            if commit:
                conn.rollback()
            # Propagate the exception to the caller
            raise e
        finally:
            self.pool.return_connection(conn)

    async def executemany(self, sql, seq_of_params, commit=False):
        return await self.loop.run_in_executor(
            self.executor,
            self._executemany_blocking,
            sql,
            seq_of_params,
            commit
        )

    def _executemany_blocking(self, sql, seq_of_params, commit):
        conn = self.pool.get_connection()
        try:
            cur = conn.cursor()
            cur.executemany(sql, seq_of_params)
            if commit:
                conn.commit()
            else:
                result = cur.fetchall()
                return result
        except Exception as e:
            if commit:
                conn.rollback()
            raise e
        finally:
            self.pool.return_connection(conn)

    def close(self):
        self.executor.shutdown(wait=True)
        self.pool.close_all()

Step 3. Using the AsyncDB in an Asyncio Application

Here is how you might use the above class in an asyncio application.

import asyncio

async def main():
    db = AsyncDB("example.db", pool_size=5)
    
    # Create a table (write operation)
    await db.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT)", commit=True)
    
    # Insert a record
    await db.execute("INSERT INTO users (name) VALUES (?)", params=("Alice",), commit=True)
    
    # Query records
    rows = await db.execute("SELECT * FROM users")
    print("Users:", rows)
    
    # Using executemany for batch insert
    await db.executemany("INSERT INTO users (name) VALUES (?)", [("Bob",), ("Carol",)], commit=True)
    
    rows = await db.execute("SELECT * FROM users")
    print("Updated Users:", rows)
    
    # Close the database connection pool
    db.close()

if __name__ == "__main__":
    asyncio.run(main())

Summary

In this thread pool + connection pooling approach:

  • A fixed-size pool of SQLite connections is created and reused.
  • All database operations are run in the background threads (managed by ThreadPoolExecutor).
  • The asynchronous application awaits the results without getting blocked by the database’s synchronous I/O.
  • The design is simpler and more maintainable than custom thread + queue juggling. It automatically scales (to a limited extent) through a standard connection pool and thread pool model.

This design is more modular and leverages Python’s built-in capabilities, thereby minimizing custom code and potential bugs while simplifying resource management.

@simonw
Copy link
Author

simonw commented Feb 14, 2025

llm -c 'write me a stress test for that AsyncDB() code - I want something I can run that will test and measure its concurrent limits including for writes and reads'

Below is an example script that can be run to stress‐test the AsyncDB class. This script creates a test table and then launches many concurrent “write” and “read” tasks. It measures the total execution time and prints a final count of rows. You can tweak the number of concurrent operations (both inserts and selects) to probe the limits of your thread pool and SQLite connections.

Below is the complete code for the stress test:

#!/usr/bin/env python3
"""
Stress test for the AsyncDB connection pool and ThreadPoolExecutor design.
This script launches a number of concurrent write (inserts) and read (selects)
operations against an SQLite database. It then measures the total execution time
and prints the results.

Make sure the AsyncDB and SQLiteConnectionPool classes have been defined.
(You can integrate the definitions from your AsyncDB code above.)
"""

import asyncio
import time
import sqlite3
from concurrent.futures import ThreadPoolExecutor
from queue import Queue, Empty

# ----- Connection Pool and AsyncDB Class Definitions -----
# (These definitions are taken from the previous example.)

class SQLiteConnectionPool:
    def __init__(self, db_path, pool_size=5, **connect_kwargs):
        self.db_path = db_path
        self.pool = Queue(maxsize=pool_size)
        self.pool_size = pool_size
        self.connect_kwargs = connect_kwargs
        self._initialize_pool()

    def _initialize_pool(self):
        for _ in range(self.pool_size):
            conn = sqlite3.connect(self.db_path, check_same_thread=False, **self.connect_kwargs)
            self.pool.put(conn)

    def get_connection(self, timeout=None):
        try:
            return self.pool.get(timeout=timeout)
        except Empty:
            raise RuntimeError("No available database connections in the pool.")

    def return_connection(self, conn):
        self.pool.put(conn)

    def close_all(self):
        while not self.pool.empty():
            conn = self.pool.get()
            conn.close()


class AsyncDB:
    def __init__(self, db_path, pool_size=5):
        self.loop = asyncio.get_event_loop()
        self.executor = ThreadPoolExecutor(max_workers=pool_size)
        self.pool = SQLiteConnectionPool(db_path, pool_size=pool_size)
        
    async def execute(self, sql, params=None, commit=False):
        """
        Execute an SQL statement using the thread pool.
        If commit is True, run within a transaction.
        For read queries, the result (the full fetched rows) is returned.
        """
        return await self.loop.run_in_executor(
            self.executor,
            self._execute_blocking,
            sql,
            params or (),
            commit
        )

    def _execute_blocking(self, sql, params, commit):
        conn = self.pool.get_connection()
        try:
            cur = conn.cursor()
            cur.execute(sql, params)
            if commit:
                conn.commit()
                result = None
            else:
                result = cur.fetchall()
            return result
        except Exception as e:
            if commit:
                conn.rollback()
            raise e
        finally:
            self.pool.return_connection(conn)

    async def executemany(self, sql, seq_of_params, commit=False):
        return await self.loop.run_in_executor(
            self.executor,
            self._executemany_blocking,
            sql,
            seq_of_params,
            commit
        )

    def _executemany_blocking(self, sql, seq_of_params, commit):
        conn = self.pool.get_connection()
        try:
            cur = conn.cursor()
            cur.executemany(sql, seq_of_params)
            if commit:
                conn.commit()
                result = None
            else:
                result = cur.fetchall()
            return result
        except Exception as e:
            if commit:
                conn.rollback()
            raise e
        finally:
            self.pool.return_connection(conn)

    def close(self):
        self.executor.shutdown(wait=True)
        self.pool.close_all()
        
# ----- Stress Test Code -----

async def stress_test():
    db_path = "stress_test.db"
    pool_size = 5  # Adjust pool size as needed
    db = AsyncDB(db_path, pool_size=pool_size)

    # Ensure a clean test table.
    print("Setting up test table...")
    await db.execute("DROP TABLE IF EXISTS stress_table", commit=True)
    await db.execute("CREATE TABLE stress_table (id INTEGER PRIMARY KEY, value TEXT)", commit=True)
    
    # Define a write (insert) task.
    async def insert_task(task_id):
        value = f"value_{task_id}"
        await db.execute("INSERT INTO stress_table (value) VALUES (?)", params=(value,), commit=True)
    
    # Define a read (select) task.
    async def read_task(task_id):
        result = await db.execute("SELECT COUNT(*) FROM stress_table")
        # The result is a list of tuples, e.g. [(count,)]
        count = result[0][0] if result and len(result) > 0 else 0
        return f"Read Task {task_id}: current count = {count}"

    # Number of concurrent operations.
    num_inserts = 300
    num_reads = 300

    tasks = []
    start_time = time.perf_counter()

    # Schedule a mix of insert and read tasks.
    # Here we mix them so that every few writes, we perform a read.
    for i in range(num_inserts):
        tasks.append(asyncio.create_task(insert_task(i)))
        if i % 3 == 0:
            tasks.append(asyncio.create_task(read_task(i)))

    # Also add additional independent read tasks concurrently
    for i in range(num_reads):
        tasks.append(asyncio.create_task(read_task(i + num_inserts)))
    
    # Wait for all tasks to complete; gather any exceptions.
    results = await asyncio.gather(*tasks, return_exceptions=True)
    end_time = time.perf_counter()
    
    print(f"Stress test completed in {end_time - start_time:.2f} seconds")
    
    # Print out a sample of read results
    for res in results:
        if isinstance(res, str):
            print(res)

    # Final state check: get total count of inserted rows.
    final = await db.execute("SELECT COUNT(*) FROM stress_table")
    final_count = final[0][0] if final else 0
    print("Expected inserted rows:", num_inserts)
    print("Final row count in stress_table:", final_count)

    # Clean up
    db.close()

if __name__ == '__main__':
    asyncio.run(stress_test())

How It Works

  1. The script first defines a test table (stress_table) in the database.
  2. It defines two asynchronous functions:
    • insert_task(task_id): Inserts a new row into the table.
    • read_task(task_id): Runs a simple SELECT COUNT(*) query to check the current row count.
  3. A mix of write and read tasks are scheduled concurrently using asyncio.create_task.
    • In this example, for every three writes, a read is also scheduled.
    • Additionally, extra read tasks are scheduled to further stress-test concurrency.
  4. The script uses asyncio.gather() to wait for all tasks to finish and measures
    the total elapsed time using time.perf_counter().
  5. Finally, the final count of records is printed to validate that all inserts were made successfully.

Why This Stress Test Is Useful

  • It simulates a high-concurrency workload by launching hundreds of operations concurrently.
  • It measures the overall throughput (total elapsed time) and, indirectly, the limits of your
    ThreadPoolExecutor and connection pool.
  • It tests both writes and reads under load so you can see how they interact when competing for pool connections.
  • Adjusting the number of tasks and pool size will help you find the optimal configuration for your environment.

You can run this script in your terminal (after ensuring that AsyncDB and its helpers are accessible) and tweak the parameters to further stress the system as needed.

@simonw
Copy link
Author

simonw commented Feb 14, 2025

llm -c "now write code that is very likely to break the database, through concurrent writes or whatever. DO not output the classes again."
#!/usr/bin/env python3
"""
This script is designed to aggressively stress the database with concurrent writes
that update the same row. It is very likely to generate locking conflicts or even break
the database’s consistency if proper locking/retry logic isn’t in place.

WARNING:
– This script issues many concurrent UPDATE statements on the same record.
– It is intended for testing purposes only and may result in errors like "database is locked".
– Use it on a test database only!

Assumptions:
  • An AsyncDB instance (from the previously defined code) exists.
  • Do not re-include class definitions here. The AsyncDB instance is imported or defined elsewhere.
  
To run this test, ensure that AsyncDB is accessible in your PYTHONPATH.
"""

import asyncio
import random

# Create an AsyncDB instance pointing to a test database.
# For this test, we intentionally use a pool size > 1 so that multiple connections
# can attempt writes concurrently. Adjust the pool_size if desired.
db = AsyncDB("stress_break_test.db", pool_size=5)

async def conflicting_write(task_id):
    """
    This update function is deliberately very aggressive:
      - It updates the same row (id == 1) by incrementing its value.
      - A random short delay is added to increase the chance of overlapping transactions.
      - With many concurrent tasks, at least some will experience locking conflicts.
    """
    try:
        await asyncio.sleep(random.uniform(0, 0.05))
        # Attempt to update the same row concurrently.
        # Each call commits immediately.
        await db.execute("UPDATE test SET value = value + 1 WHERE id = 1", commit=True)
        print(f"Task {task_id} update successful")
    except Exception as e:
        # Expect many "database is locked" errors or similar conflicts
        print(f"Task {task_id} failed with exception: {e}")

async def main():
    # Set up the table:
    # Drop any existing test table, create a new one, and insert a single initial row.
    print("Setting up test table...")
    await db.execute("DROP TABLE IF EXISTS test", commit=True)
    await db.execute("CREATE TABLE test (id INTEGER PRIMARY KEY, value INTEGER)", commit=True)
    await db.execute("INSERT INTO test (value) VALUES (0)", commit=True)
    
    # Launch a large number of concurrent update tasks to fight over that one row.
    num_tasks = 200  # Increase this number to make conflicts even more likely.
    tasks = [asyncio.create_task(conflicting_write(i)) for i in range(num_tasks)]
    
    # Wait for all tasks to finish (errors will be printed).
    await asyncio.gather(*tasks, return_exceptions=True)
    
    # Query and print the final value in the row.
    final = await db.execute("SELECT value FROM test WHERE id = 1")
    final_value = final[0][0] if final and len(final) > 0 else None
    print("Final value in test table (expected to be close to number of successful updates):", final_value)
    
    # Clean up the AsyncDB (shut down ThreadPoolExecutor and close connections)
    db.close()

if __name__ == '__main__':
    asyncio.run(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment