Skip to content

Instantly share code, notes, and snippets.

@simonw
Created March 8, 2024 12:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save simonw/4cecde4a729f4da0b5059b50c8e01359 to your computer and use it in GitHub Desktop.
Save simonw/4cecde4a729f4da0b5059b50c8e01359 to your computer and use it in GitHub Desktop.

2024-03-08T12:36:01 conversation: 01hrf11gesvvx0pec8wnxrr6zk

Model: claude-3-opus-20240229

Prompt:

Write me a Python function which accepts a path to a SQLite database

Then it spawns a new subprocess to run the following command:

python -c "import sqlite3; sqlite3...."

The code in that string will connect to the SQLite database path and then use vacuum into to create a brand new SQLite database copy of it in the /tmp directory

The function should return the path to the new SQLite database

I want to be able to call this from with an async def Python function, so it should use the asyncio mechanism for running subprocesses.

Response:

Here's a Python function that accomplishes what you described:

import asyncio
import os
import tempfile

async def vacuum_sqlite_database(db_path):
    # Generate a unique filename for the new SQLite database in the /tmp directory
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False, dir='/tmp') as tmp_file:
        new_db_path = tmp_file.name

    # Create the Python command to execute in a subprocess
    command = f"python -c \"import sqlite3; conn = sqlite3.connect('{db_path}'); conn.execute('VACUUM INTO \\\"{new_db_path}\\\"'); conn.close()\""

    # Run the command in a subprocess using asyncio
    proc = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # Wait for the subprocess to complete
    stdout, stderr = await proc.communicate()

    # Check if the subprocess completed successfully
    if proc.returncode == 0:
        return new_db_path
    else:
        # Handle any errors that occurred during the subprocess execution
        error_message = stderr.decode().strip()
        raise RuntimeError(f"Failed to vacuum SQLite database: {error_message}")

This function, named vacuum_sqlite_database, accepts a db_path parameter which represents the path to the SQLite database file.

Inside the function:

  1. It generates a unique filename for the new SQLite database in the /tmp directory using tempfile.NamedTemporaryFile(). The delete=False argument ensures that the temporary file is not automatically deleted when closed, and the dir='/tmp' argument specifies the directory where the file should be created.

  2. It constructs the Python command to execute in a subprocess. The command uses sqlite3 to connect to the specified database, execute the VACUUM INTO command to create a new copy of the database at the generated temporary file path, and then closes the connection.

  3. It uses asyncio.create_subprocess_shell() to run the command in a subprocess asynchronously. The stdout and stderr arguments are set to asyncio.subprocess.PIPE to capture the output and error streams of the subprocess.

  4. It waits for the subprocess to complete using await proc.communicate(), which returns the captured stdout and stderr outputs.

  5. It checks the return code of the subprocess. If the return code is 0 (indicating success), it returns the path to the newly created SQLite database. If an error occurred, it raises a RuntimeError with the error message obtained from the stderr output.

You can call this function from within an async def Python function using the await keyword. For example:

async def main():
    original_db_path = '/path/to/original/database.db'
    new_db_path = await vacuum_sqlite_database(original_db_path)
    print(f"New SQLite database created at: {new_db_path}")

Make sure to run the main() function using an event loop, such as asyncio.run(main()).

Note: This function assumes that the required Python dependencies (sqlite3) are available in the environment where the subprocess is executed.

@simonw
Copy link
Author

simonw commented Mar 8, 2024

Time from initial idea to working proof of concept: 4 minutes.

@simonw
Copy link
Author

simonw commented Mar 8, 2024

2024-03-08T12:44:32 conversation: 01hrf11gesvvx0pec8wnxrr6zk

Model: claude-3-opus-20240229

Prompt:

Is there a way I could add a progress bar to this? I do not know if SQLite vacuum into offers any kind of mechanism I could use to calculate progress.

Response:

Unfortunately, the VACUUM INTO command in SQLite does not provide a direct mechanism for tracking progress. However, you can estimate the progress based on the size of the original database file and the size of the new database file being created during the vacuuming process.

Here's an updated version of the vacuum_sqlite_database function that includes a progress bar:

import asyncio
import os
import tempfile
from tqdm import tqdm

async def vacuum_sqlite_database(db_path):
    # Get the size of the original database file
    original_size = os.path.getsize(db_path)

    # Generate a unique filename for the new SQLite database in the /tmp directory
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False, dir='/tmp') as tmp_file:
        new_db_path = tmp_file.name

    # Create the Python command to execute in a subprocess
    command = f"python -c \"import sqlite3; conn = sqlite3.connect('{db_path}'); conn.execute('VACUUM INTO \\\"{new_db_path}\\\"'); conn.close()\""

    # Run the command in a subprocess using asyncio
    proc = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # Create a progress bar
    progress_bar = tqdm(total=original_size, unit='B', unit_scale=True, desc='Vacuuming')

    # Update the progress bar based on the size of the new database file
    while proc.returncode is None:
        await asyncio.sleep(0.1)  # Wait for a short interval before checking again
        try:
            new_size = os.path.getsize(new_db_path)
            progress_bar.update(new_size - progress_bar.n)
        except FileNotFoundError:
            pass

    # Wait for the subprocess to complete
    stdout, stderr = await proc.communicate()

    # Close the progress bar
    progress_bar.close()

    # Check if the subprocess completed successfully
    if proc.returncode == 0:
        return new_db_path
    else:
        # Handle any errors that occurred during the subprocess execution
        error_message = stderr.decode().strip()
        raise RuntimeError(f"Failed to vacuum SQLite database: {error_message}")

In this updated version:

  1. It retrieves the size of the original database file using os.path.getsize(db_path) before starting the vacuuming process.

  2. It creates a progress bar using the tqdm library, specifying the total size of the original database file as the total value.

  3. While the subprocess is running (i.e., proc.returncode is None), it periodically checks the size of the new database file being created using os.path.getsize(new_db_path) and updates the progress bar accordingly.

  4. After the subprocess completes, it closes the progress bar.

The progress bar will display the estimated progress based on the size of the new database file being created. However, please note that this is an approximation and may not always accurately represent the actual progress of the VACUUM INTO command.

To use this updated function, make sure you have the tqdm library installed (pip install tqdm).

Keep in mind that the progress bar is based on the size of the new database file, which may not directly correspond to the actual progress of the vacuuming process. The VACUUM INTO command may have additional overhead or optimizations that affect the relationship between the file size and the progress. Nevertheless, this approach provides a reasonable estimate of the progress.

@simonw
Copy link
Author

simonw commented Mar 8, 2024

Got less impressive as I worked with it more here, it kept having blocking calls that would prevent the iterator from being usable:


2024-03-08T12:36:01 conversation: 01hrf11gesvvx0pec8wnxrr6zk

Model: claude-3-opus-20240229

Prompt:

Write me a Python function which accepts a path to a SQLite database

Then it spawns a new subprocess to run the following command:

python -c "import sqlite3; sqlite3...."

The code in that string will connect to the SQLite database path and then use vacuum into to create a brand new SQLite database copy of it in the /tmp directory

The function should return the path to the new SQLite database

I want to be able to call this from with an async def Python function, so it should use the asyncio mechanism for running subprocesses.

Response:

Here's a Python function that accomplishes what you described:

import asyncio
import os
import tempfile

async def vacuum_sqlite_database(db_path):
    # Generate a unique filename for the new SQLite database in the /tmp directory
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False, dir='/tmp') as tmp_file:
        new_db_path = tmp_file.name

    # Create the Python command to execute in a subprocess
    command = f"python -c \"import sqlite3; conn = sqlite3.connect('{db_path}'); conn.execute('VACUUM INTO \\\"{new_db_path}\\\"'); conn.close()\""

    # Run the command in a subprocess using asyncio
    proc = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # Wait for the subprocess to complete
    stdout, stderr = await proc.communicate()

    # Check if the subprocess completed successfully
    if proc.returncode == 0:
        return new_db_path
    else:
        # Handle any errors that occurred during the subprocess execution
        error_message = stderr.decode().strip()
        raise RuntimeError(f"Failed to vacuum SQLite database: {error_message}")

This function, named vacuum_sqlite_database, accepts a db_path parameter which represents the path to the SQLite database file.

Inside the function:

  1. It generates a unique filename for the new SQLite database in the /tmp directory using tempfile.NamedTemporaryFile(). The delete=False argument ensures that the temporary file is not automatically deleted when closed, and the dir='/tmp' argument specifies the directory where the file should be created.

  2. It constructs the Python command to execute in a subprocess. The command uses sqlite3 to connect to the specified database, execute the VACUUM INTO command to create a new copy of the database at the generated temporary file path, and then closes the connection.

  3. It uses asyncio.create_subprocess_shell() to run the command in a subprocess asynchronously. The stdout and stderr arguments are set to asyncio.subprocess.PIPE to capture the output and error streams of the subprocess.

  4. It waits for the subprocess to complete using await proc.communicate(), which returns the captured stdout and stderr outputs.

  5. It checks the return code of the subprocess. If the return code is 0 (indicating success), it returns the path to the newly created SQLite database. If an error occurred, it raises a RuntimeError with the error message obtained from the stderr output.

You can call this function from within an async def Python function using the await keyword. For example:

async def main():
    original_db_path = '/path/to/original/database.db'
    new_db_path = await vacuum_sqlite_database(original_db_path)
    print(f"New SQLite database created at: {new_db_path}")

Make sure to run the main() function using an event loop, such as asyncio.run(main()).

Note: This function assumes that the required Python dependencies (sqlite3) are available in the environment where the subprocess is executed.

2024-03-08T12:44:32

Prompt:

Is there a way I could add a progress bar to this? I do not know if SQLite vacuum into offers any kind of mechanism I could use to calculate progress.

Response:

Unfortunately, the VACUUM INTO command in SQLite does not provide a direct mechanism for tracking progress. However, you can estimate the progress based on the size of the original database file and the size of the new database file being created during the vacuuming process.

Here's an updated version of the vacuum_sqlite_database function that includes a progress bar:

import asyncio
import os
import tempfile
from tqdm import tqdm

async def vacuum_sqlite_database(db_path):
    # Get the size of the original database file
    original_size = os.path.getsize(db_path)

    # Generate a unique filename for the new SQLite database in the /tmp directory
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False, dir='/tmp') as tmp_file:
        new_db_path = tmp_file.name

    # Create the Python command to execute in a subprocess
    command = f"python -c \"import sqlite3; conn = sqlite3.connect('{db_path}'); conn.execute('VACUUM INTO \\\"{new_db_path}\\\"'); conn.close()\""

    # Run the command in a subprocess using asyncio
    proc = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # Create a progress bar
    progress_bar = tqdm(total=original_size, unit='B', unit_scale=True, desc='Vacuuming')

    # Update the progress bar based on the size of the new database file
    while proc.returncode is None:
        await asyncio.sleep(0.1)  # Wait for a short interval before checking again
        try:
            new_size = os.path.getsize(new_db_path)
            progress_bar.update(new_size - progress_bar.n)
        except FileNotFoundError:
            pass

    # Wait for the subprocess to complete
    stdout, stderr = await proc.communicate()

    # Close the progress bar
    progress_bar.close()

    # Check if the subprocess completed successfully
    if proc.returncode == 0:
        return new_db_path
    else:
        # Handle any errors that occurred during the subprocess execution
        error_message = stderr.decode().strip()
        raise RuntimeError(f"Failed to vacuum SQLite database: {error_message}")

In this updated version:

  1. It retrieves the size of the original database file using os.path.getsize(db_path) before starting the vacuuming process.

  2. It creates a progress bar using the tqdm library, specifying the total size of the original database file as the total value.

  3. While the subprocess is running (i.e., proc.returncode is None), it periodically checks the size of the new database file being created using os.path.getsize(new_db_path) and updates the progress bar accordingly.

  4. After the subprocess completes, it closes the progress bar.

The progress bar will display the estimated progress based on the size of the new database file being created. However, please note that this is an approximation and may not always accurately represent the actual progress of the VACUUM INTO command.

To use this updated function, make sure you have the tqdm library installed (pip install tqdm).

Keep in mind that the progress bar is based on the size of the new database file, which may not directly correspond to the actual progress of the vacuuming process. The VACUUM INTO command may have additional overhead or optimizations that affect the relationship between the file size and the progress. Nevertheless, this approach provides a reasonable estimate of the progress.

2024-03-08T12:47:27

Prompt:

I don't want to use tqdm. Instead, I would like the estimated progress to be sent to stdout from the child process as newline delimited JSON, each one an object that shows the bytes so far, the estimated total bytes and includes an estimate of the floating point number of seconds left, or null if the estimate cannot yet be calculated. These lines should be output at the rate of around one every half a second. Also update the calling function so it can be used as an async iterator looping over those progress objects
'

Response:

Certainly! Here's an updated version of the vacuum_sqlite_database function that sends progress information as newline-delimited JSON objects to stdout from the child process and allows the calling function to use it as an async iterator:

import asyncio
import json
import os
import tempfile
import time

async def vacuum_sqlite_database(db_path):
    # Get the size of the original database file
    original_size = os.path.getsize(db_path)

    # Generate a unique filename for the new SQLite database in the /tmp directory
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False, dir='/tmp') as tmp_file:
        new_db_path = tmp_file.name

    # Create the Python command to execute in a subprocess
    command = f"python -c \"import sqlite3, os, json, time; " \
              f"conn = sqlite3.connect('{db_path}'); " \
              f"conn.execute('VACUUM INTO \\\"{new_db_path}\\\"'); " \
              f"start_time = time.time(); " \
              f"while True: " \
              f"  try: " \
              f"    new_size = os.path.getsize('{new_db_path}'); " \
              f"    elapsed_time = time.time() - start_time; " \
              f"    remaining_time = (elapsed_time / new_size) * ({original_size} - new_size) if new_size > 0 else None; " \
              f"    progress = {{'bytes_so_far': new_size, 'total_bytes': {original_size}, 'seconds_remaining': remaining_time}}; " \
              f"    print(json.dumps(progress)); " \
              f"  except FileNotFoundError: " \
              f"    pass; " \
              f"  if new_size >= {original_size}: " \
              f"    break; " \
              f"  time.sleep(0.5); " \
              f"conn.close()\""

    # Run the command in a subprocess using asyncio
    proc = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # Async iterator to yield progress objects
    async def progress_iterator():
        while proc.returncode is None:
            line = await proc.stdout.readline()
            if line:
                progress = json.loads(line.decode().strip())
                yield progress

    # Wait for the subprocess to complete
    _, stderr = await proc.communicate()

    # Check if the subprocess completed successfully
    if proc.returncode == 0:
        return new_db_path, progress_iterator()
    else:
        # Handle any errors that occurred during the subprocess execution
        error_message = stderr.decode().strip()
        raise RuntimeError(f"Failed to vacuum SQLite database: {error_message}")

In this updated version:

  1. The Python command executed in the subprocess has been modified to send progress information as newline-delimited JSON objects to stdout. It calculates the bytes processed so far (new_size), the total bytes (original_size), and an estimate of the remaining time (remaining_time) based on the elapsed time and the current progress. It sends these values as a JSON object to stdout every 0.5 seconds until the vacuuming process is complete.

  2. The vacuum_sqlite_database function now returns a tuple containing the path to the new SQLite database and an async iterator (progress_iterator()) that yields progress objects.

  3. The progress_iterator() async generator function reads lines from the subprocess's stdout, decodes them as JSON objects, and yields the progress objects.

To use this updated function, you can loop over the async iterator to retrieve the progress objects:

async def main():
    original_db_path = '/path/to/original/database.db'
    new_db_path, progress_iterator = await vacuum_sqlite_database(original_db_path)
    async for progress in progress_iterator:
        bytes_so_far = progress['bytes_so_far']
        total_bytes = progress['total_bytes']
        seconds_remaining = progress['seconds_remaining']
        print(f"Progress: {bytes_so_far}/{total_bytes} bytes, Estimated time remaining: {seconds_remaining} seconds")
    print(f"New SQLite database created at: {new_db_path}")

In the main() function, you can use an async for loop to iterate over the progress objects yielded by the progress_iterator. Each progress object contains the bytes_so_far, total_bytes, and seconds_remaining values, which you can use to display the progress information.

Remember to run the main() function using an event loop, such as asyncio.run(main()).

2024-03-08T12:49:26

Prompt:

I do not think that will work because conn.execute() for the vacuum into is a blocking call, so it will not run the code that looks at the file sizes until it has finished. Instead of estimating the time in the child process move that activity to the parent object and run it while the vacuum subprocess is running.

Response:

Apologies for the confusion. You're right. Since conn.execute() is a blocking call, the code inside the subprocess won't execute until the vacuum operation is complete. Let's update the code to estimate the progress in the parent process while the vacuum subprocess is running.

Here's the updated version of the vacuum_sqlite_database function:

import asyncio
import json
import os
import tempfile
import time

async def vacuum_sqlite_database(db_path):
    # Get the size of the original database file
    original_size = os.path.getsize(db_path)

    # Generate a unique filename for the new SQLite database in the /tmp directory
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False, dir='/tmp') as tmp_file:
        new_db_path = tmp_file.name

    # Create the Python command to execute in a subprocess
    command = f"python -c \"import sqlite3; " \
              f"conn = sqlite3.connect('{db_path}'); " \
              f"conn.execute('VACUUM INTO \\\"{new_db_path}\\\"'); " \
              f"conn.close()\""

    # Run the command in a subprocess using asyncio
    proc = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # Async generator to yield progress objects
    async def progress_generator():
        start_time = time.time()
        while proc.returncode is None:
            try:
                new_size = os.path.getsize(new_db_path)
                elapsed_time = time.time() - start_time
                remaining_time = (elapsed_time / new_size) * (original_size - new_size) if new_size > 0 else None
                progress = {
                    'bytes_so_far': new_size,
                    'total_bytes': original_size,
                    'seconds_remaining': remaining_time
                }
                yield progress
            except FileNotFoundError:
                pass
            await asyncio.sleep(0.5)

    # Wait for the subprocess to complete
    _, stderr = await proc.communicate()

    # Check if the subprocess completed successfully
    if proc.returncode == 0:
        return new_db_path, progress_generator()
    else:
        # Handle any errors that occurred during the subprocess execution
        error_message = stderr.decode().strip()
        raise RuntimeError(f"Failed to vacuum SQLite database: {error_message}")

In this updated version:

  1. The Python command executed in the subprocess has been simplified to only perform the vacuum operation without any progress estimation.

  2. The progress estimation is now handled by the progress_generator() async generator function in the parent process.

  3. Inside the progress_generator(), it repeatedly checks the size of the new database file (new_size), calculates the elapsed time (elapsed_time), and estimates the remaining time (remaining_time) based on the current progress. It yields the progress information as a dictionary every 0.5 seconds until the subprocess completes.

  4. The vacuum_sqlite_database function returns a tuple containing the path to the new SQLite database and the async generator (progress_generator()) that yields progress objects.

You can use this updated function in the same way as before:

async def main():
    original_db_path = '/path/to/original/database.db'
    new_db_path, progress_generator = await vacuum_sqlite_database(original_db_path)
    async for progress in progress_generator:
        bytes_so_far = progress['bytes_so_far']
        total_bytes = progress['total_bytes']
        seconds_remaining = progress['seconds_remaining']
        print(f"Progress: {bytes_so_far}/{total_bytes} bytes, Estimated time remaining: {seconds_remaining} seconds")
    print(f"New SQLite database created at: {new_db_path}")

The main() function remains the same, using an async for loop to iterate over the progress objects yielded by the progress_generator.

Remember to run the main() function using an event loop, such as asyncio.run(main()).

This updated version should correctly estimate the progress while the vacuum subprocess is running.

2024-03-08T12:50:31

Prompt:

will that not block at the proc.communicate() line? I want to be able to show progress while progress is happening

Response:

You're absolutely right. Calling proc.communicate() will block until the subprocess completes, preventing progress updates from being shown while the vacuum operation is in progress.

To resolve this, we can use asyncio.create_task() to run the progress generator concurrently with the subprocess and use asyncio.gather() to wait for both tasks to complete.

Here's the updated version of the vacuum_sqlite_database function:

import asyncio
import os
import tempfile
import time

async def vacuum_sqlite_database(db_path):
    # Get the size of the original database file
    original_size = os.path.getsize(db_path)

    # Generate a unique filename for the new SQLite database in the /tmp directory
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False, dir='/tmp') as tmp_file:
        new_db_path = tmp_file.name

    # Create the Python command to execute in a subprocess
    command = f"python -c \"import sqlite3; " \
              f"conn = sqlite3.connect('{db_path}'); " \
              f"conn.execute('VACUUM INTO \\\"{new_db_path}\\\"'); " \
              f"conn.close()\""

    # Run the command in a subprocess using asyncio
    proc = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    # Async generator to yield progress objects
    async def progress_generator():
        start_time = time.time()
        while True:
            try:
                new_size = os.path.getsize(new_db_path)
                elapsed_time = time.time() - start_time
                remaining_time = (elapsed_time / new_size) * (original_size - new_size) if new_size > 0 else None
                progress = {
                    'bytes_so_far': new_size,
                    'total_bytes': original_size,
                    'seconds_remaining': remaining_time
                }
                yield progress
            except FileNotFoundError:
                pass
            await asyncio.sleep(0.5)

    # Create a task for the progress generator
    progress_task = asyncio.create_task(progress_generator())

    # Wait for the subprocess to complete
    _, stderr = await proc.communicate()

    # Cancel the progress task
    progress_task.cancel()

    # Check if the subprocess completed successfully
    if proc.returncode == 0:
        return new_db_path, progress_task
    else:
        # Handle any errors that occurred during the subprocess execution
        error_message = stderr.decode().strip()
        raise RuntimeError(f"Failed to vacuum SQLite database: {error_message}")

In this updated version:

  1. The progress_generator() async generator function now runs indefinitely, yielding progress objects every 0.5 seconds until it is explicitly canceled.

  2. We create a task for the progress generator using asyncio.create_task() and assign it to the progress_task variable.

  3. We wait for the subprocess to complete using proc.communicate(), which will block until the vacuum operation is finished.

  4. After the subprocess completes, we cancel the progress_task to stop the progress generator.

  5. The vacuum_sqlite_database function returns a tuple containing the path to the new SQLite database and the progress_task.

You can use this updated function as follows:

async def main():
    original_db_path = '/path/to/original/database.db'
    new_db_path, progress_task = await vacuum_sqlite_database(original_db_path)
    async for progress in progress_task:
        bytes_so_far = progress['bytes_so_far']
        total_bytes = progress['total_bytes']
        seconds_remaining = progress['seconds_remaining']
        print(f"Progress: {bytes_so_far}/{total_bytes} bytes, Estimated time remaining: {seconds_remaining} seconds")
    print(f"New SQLite database created at: {new_db_path}")

In the main() function, we use an async for loop to iterate over the progress objects yielded by the progress_task. The progress updates will be shown while the vacuum operation is in progress.

Remember to run the main() function using an event loop, such as asyncio.run(main()).

This updated version should correctly display progress updates while the vacuum operation is running in the subprocess.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment