Skip to content

Instantly share code, notes, and snippets.

@simonw

simonw/limbo.md Secret

Last active December 10, 2024 19:37
Show Gist options
  • Save simonw/bd1822f372c406d17ed24772f8b93eea to your computer and use it in GitHub Desktop.
Save simonw/bd1822f372c406d17ed24772f8b93eea to your computer and use it in GitHub Desktop.

2024-12-10T19:30:02 conversation: 01jes0tvmjydapsw1wzamh67t1

Model: gemini-exp-1206

Prompt:

file-to-prompt limbo/bindings/python -c
<documents>
<document index="1">
<source>./Cargo.toml</source>
<document_content>
[package]
name = "py-limbo"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true

[lib]
name = "_limbo"
crate-type = ["cdylib"]

[features]
# must be enabled when building with `cargo build`, maturin enables this automatically
extension-module = ["pyo3/extension-module"]

[dependencies]
anyhow = "1.0"
limbo_core = { path = "../../core" }
pyo3 = { version = "0.22.4", features = ["anyhow"] }

[build-dependencies]
version_check = "0.9.5"
# used where logic has to be version/distribution specific, e.g. pypy
pyo3-build-config = { version = "0.22.4" }

</document_content>
</document>
<document index="2">
<source>./Makefile</source>
<document_content>
REQUIREMENTS     := requirements.txt
REQUIREMENTS_DEV := requirements-dev.txt

all: check-requirements install lint test
.PHONY: all

install:
	@echo "Installing requirements..."
	pip install -r requirements.txt -r requirements-dev.txt
.PHONY: install

test:
	@echo "Running tests..."
	pytest
.PHONY: test

lint:
	@echo "Running linters..."
	ruff check
	ruff format --diff
.PHONY: lint

check-requirements:
	@echo "Checking requirements files..."
	mkdir -p .tmp
	pip-compile pyproject.toml --quiet             --output-file=.tmp/$(REQUIREMENTS)
	pip-compile pyproject.toml --quiet --extra=dev --output-file=.tmp/$(REQUIREMENTS_DEV)
	diff -u $(REQUIREMENTS)     .tmp/$(REQUIREMENTS)     || (echo "$(REQUIREMENTS) doesn't match pyproject.toml"     && exit 1)
	diff -u $(REQUIREMENTS_DEV) .tmp/$(REQUIREMENTS_DEV) || (echo "$(REQUIREMENTS_DEV) doesn't match pyproject.toml" && exit 1)
	@echo "Requirements files match pyproject.toml"
.PHONY: check-requirements

compile-requirements:
	@echo "Compiling requirements files..."
	pip-compile pyproject.toml             --output-file=$(REQUIREMENTS)
	pip-compile pyproject.toml --extra=dev --output-file=$(REQUIREMENTS_DEV)
.PHONY: compile-requirements

</document_content>
</document>
<document index="3">
<source>./build.rs</source>
<document_content>
fn main() {
    pyo3_build_config::use_pyo3_cfgs();
    println!("cargo::rustc-check-cfg=cfg(allocator, values(\"default\", \"mimalloc\"))");
}

</document_content>
</document>
<document index="4">
<source>./example.py</source>
<document_content>
import limbo

con = limbo.connect("sqlite.db")
cur = con.cursor()
res = cur.execute("SELECT * FROM users")
print(res.fetchone())

</document_content>
</document>
<document index="5">
<source>./pyproject.toml</source>
<document_content>
[build-system]
requires = ['maturin>=1,<2', 'typing_extensions']
build-backend = 'maturin'

[project]
name = 'pylimbo'
description = "Limbo is a work-in-progress, in-process OLTP database management system, compatible with SQLite."
requires-python = '>=3.9'
classifiers = [
    'Development Status :: 3 - Alpha',
    'Programming Language :: Python',
    'Programming Language :: Python :: 3',
    'Programming Language :: Python :: 3 :: Only',
    'Programming Language :: Python :: 3.9',
    'Programming Language :: Python :: 3.10',
    'Programming Language :: Python :: 3.11',
    'Programming Language :: Python :: 3.12',
    'Programming Language :: Python :: 3.13',
    'Programming Language :: Rust',
    'License :: OSI Approved :: MIT License',
    'Operating System :: POSIX :: Linux',
    'Operating System :: Microsoft :: Windows',
    'Operating System :: MacOS',
    'Topic :: Database',
    'Topic :: Software Development :: Libraries',
    'Topic :: Software Development :: Libraries :: Python Modules',
    'Topic :: Database :: Database Engines/Servers',
]
dependencies = ['typing-extensions >=4.6.0,!=4.7.0']
dynamic = ['readme', 'version']

[project.optional-dependencies]
dev = ["mypy==1.11.0", "pytest==8.3.1", "pytest-cov==5.0.0", "ruff==0.5.4", "coverage==7.6.1"]

[project.urls]
Homepage = "https://github.com/penberg/limbo"
Source = "https://github.com/penberg/limbo"

[tool.maturin]
bindings = 'pyo3'
module-name = "limbo._limbo"
features = ["pyo3/extension-module"]

[tool.pip-tools]
strip-extras = true
header = false
upgrade = false

[tool.ruff]
line-length = 120

[tool.ruff.lint]
extend-select = [
    "E",   # pycodestyle errors
    "W",   # pycodestyle warings
    "F",   # pyflakes
    'Q',   # flake8-quotes
    'C90', # mccabe
    'I',   # isort
]

[tool.pytest.ini_options]
testpaths = 'tests'
log_format = '%(name)s %(levelname)s: %(message)s'

[tool.coverage.run]
source = ['limbo']
branch = true

[tool.coverage.report]
precision = 2
exclude_lines = [
    'pragma: no cover',
    'raise NotImplementedError',
    'if TYPE_CHECKING:',
    '@overload',
]

</document_content>
</document>
<document index="6">
<source>./requirements-dev.txt</source>
<document_content>
coverage==7.6.1
    # via
    #   limbo (pyproject.toml)
    #   pytest-cov
iniconfig==2.0.0
    # via pytest
mypy==1.11.0
    # via limbo (pyproject.toml)
mypy-extensions==1.0.0
    # via mypy
packaging==24.2
    # via pytest
pluggy==1.5.0
    # via pytest
pytest==8.3.1
    # via
    #   limbo (pyproject.toml)
    #   pytest-cov
pytest-cov==5.0.0
    # via limbo (pyproject.toml)
ruff==0.5.4
    # via limbo (pyproject.toml)
typing-extensions==4.12.2
    # via
    #   limbo (pyproject.toml)
    #   mypy

</document_content>
</document>
<document index="7">
<source>./requirements.txt</source>
<document_content>
typing-extensions==4.12.2
    # via limbo (pyproject.toml)

</document_content>
</document>
<document index="8">
<source>./tests/__init__.py</source>
<document_content>

</document_content>
</document>
<document index="9">
<source>./tests/test_database.py</source>
<document_content>
import sqlite3

import pytest

import limbo


@pytest.mark.parametrize("provider", ["sqlite3", "limbo"])
def test_fetchall_select_all_users(provider):
    conn = connect(provider, "tests/database.db")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")

    users = cursor.fetchall()
    assert users
    assert users == [(1, "alice"), (2, "bob")]


@pytest.mark.parametrize("provider", ["sqlite3", "limbo"])
def test_fetchall_select_user_ids(provider):
    conn = connect(provider, "tests/database.db")
    cursor = conn.cursor()
    cursor.execute("SELECT id FROM users")

    user_ids = cursor.fetchall()
    assert user_ids
    assert user_ids == [(1,), (2,)]


@pytest.mark.parametrize("provider", ["sqlite3", "limbo"])
def test_fetchone_select_all_users(provider):
    conn = connect(provider, "tests/database.db")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")

    alice = cursor.fetchone()
    assert alice
    assert alice == (1, "alice")

    bob = cursor.fetchone()
    assert bob
    assert bob == (2, "bob")


@pytest.mark.parametrize("provider", ["sqlite3", "limbo"])
def test_fetchone_select_max_user_id(provider):
    conn = connect(provider, "tests/database.db")
    cursor = conn.cursor()
    cursor.execute("SELECT MAX(id) FROM users")

    max_id = cursor.fetchone()
    assert max_id
    assert max_id == (2,)


def connect(provider, database):
    if provider == "limbo":
        return limbo.connect(database)
    if provider == "sqlite3":
        return sqlite3.connect(database)
    raise Exception(f"Provider `{provider}` is not supported")

</document_content>
</document>
<document index="10">
<source>./limbo/__init__.py</source>
<document_content>
from ._limbo import (
    Connection,
    Cursor,
    DatabaseError,
    DataError,
    IntegrityError,
    InterfaceError,
    InternalError,
    NotSupportedError,
    OperationalError,
    ProgrammingError,
    __version__,
    connect,
)

__all__ = [
    "__version__",
    "Connection",
    "Cursor",
    "InterfaceError",
    "DatabaseError",
    "DataError",
    "OperationalError",
    "IntegrityError",
    "InternalError",
    "ProgrammingError",
    "NotSupportedError",
    "connect",
]

</document_content>
</document>
<document index="11">
<source>./limbo/_limbo.pyi</source>
<document_content>
from typing import Any, List, Optional, Tuple

__version__: str

class Connection:
    def cursor(self) -> "Cursor":
        """
        Creates a new cursor object using this connection.

        :return: A new Cursor object.
        :raises InterfaceError: If the cursor cannot be created.
        """
        ...

    def close(self) -> None:
        """
        Closes the connection to the database.

        :raises OperationalError: If there is an error closing the connection.
        """
        ...

    def commit(self) -> None:
        """
        Commits the current transaction.

        :raises OperationalError: If there is an error during commit.
        """
        ...

    def rollback(self) -> None:
        """
        Rolls back the current transaction.

        :raises OperationalError: If there is an error during rollback.
        """
        ...

class Cursor:
    arraysize: int
    description: Optional[
        Tuple[
            str,
            str,
            Optional[str],
            Optional[str],
            Optional[str],
            Optional[str],
            Optional[str],
        ]
    ]
    rowcount: int

    def execute(self, sql: str, parameters: Optional[Tuple[Any, ...]] = None) -> "Cursor":
        """
        Prepares and executes a SQL statement using the connection.

        :param sql: The SQL query to execute.
        :param parameters: The parameters to substitute into the SQL query.
        :raises ProgrammingError: If there is an error in the SQL query.
        :raises OperationalError: If there is an error executing the query.
        :return: The cursor object.
        """
        ...

    def executemany(self, sql: str, parameters: Optional[List[Tuple[Any, ...]]] = None) -> None:
        """
        Executes a SQL command against all parameter sequences or mappings found in the sequence `parameters`.

        :param sql: The SQL command to execute.
        :param parameters: A list of parameter sequences or mappings.
        :raises ProgrammingError: If there is an error in the SQL query.
        :raises OperationalError: If there is an error executing the query.
        """
        ...

    def fetchone(self) -> Optional[Tuple[Any, ...]]:
        """
        Fetches the next row from the result set.

        :return: A tuple representing the next row, or None if no more rows are available.
        :raises OperationalError: If there is an error fetching the row.
        """
        ...

    def fetchall(self) -> List[Tuple[Any, ...]]:
        """
        Fetches all remaining rows from the result set.

        :return: A list of tuples, each representing a row in the result set.
        :raises OperationalError: If there is an error fetching the rows.
        """
        ...

    def fetchmany(self, size: Optional[int] = None) -> List[Tuple[Any, ...]]:
        """
        Fetches the next set of rows of a size specified by the `arraysize` property.

        :param size: Optional integer to specify the number of rows to fetch.
        :return: A list of tuples, each representing a row in the result set.
        :raises OperationalError: If there is an error fetching the rows.
        """
        ...

    def close(self) -> None:
        """
        Closes the cursor.

        :raises OperationalError: If there is an error closing the cursor.
        """
        ...

# Exception classes
class Warning(Exception):
    """Exception raised for important warnings like data truncations while inserting."""

    ...

class Error(Exception):
    """Base class for all other error exceptions. Catch all database-related errors using this class."""

    ...

class InterfaceError(Error):
    """Exception raised for errors related to the database interface rather than the database itself."""

    ...

class DatabaseError(Error):
    """Exception raised for errors that are related to the database."""

    ...

class DataError(DatabaseError):
    """
    Exception raised for errors due to problems with the processed data like division by zero, numeric value out of
    range, etc.
    """

    ...

class OperationalError(DatabaseError):
    """
    Exception raised for errors related to the database’s operation, not necessarily under the programmer's control.
    """

    ...

class IntegrityError(DatabaseError):
    """Exception raised when the relational integrity of the database is affected, e.g., a foreign key check fails."""

    ...

class InternalError(DatabaseError):
    """
    Exception raised when the database encounters an internal error, e.g., cursor is not valid anymore, transaction out
    of sync.
    """

    ...

class ProgrammingError(DatabaseError):
    """
    Exception raised for programming errors, e.g., table not found, syntax error in SQL, wrong number of parameters
    specified.
    """

    ...

class NotSupportedError(DatabaseError):
    """Exception raised when a method or database API is used which is not supported by the database."""

    ...

def connect(path: str) -> Connection:
    """
    Connects to a database at the specified path.

    :param path: The path to the database file.
    :return: A Connection object to the database.
    :raises InterfaceError: If the database cannot be connected.
    """
    ...

</document_content>
</document>
<document index="12">
<source>./limbo/py.typed</source>
<document_content>

</document_content>
</document>
<document index="13">
<source>./src/errors.rs</source>
<document_content>
use pyo3::create_exception;
use pyo3::exceptions::PyException;

create_exception!(
    limbo,
    Warning,
    PyException,
    "Exception raised for important warnings like data truncations while inserting."
);
create_exception!(limbo, Error, PyException, "Base class for all other error exceptions. Catch all database-related errors using this class.");

create_exception!(
    limbo,
    InterfaceError,
    Error,
    "Raised for errors related to the database interface rather than the database itself."
);
create_exception!(
    limbo,
    DatabaseError,
    Error,
    "Raised for errors that are related to the database."
);

create_exception!(limbo, DataError, DatabaseError, "Raised for errors due to problems with the processed data like division by zero, numeric value out of range, etc.");
create_exception!(limbo, OperationalError, DatabaseError, "Raised for errors related to the database’s operation, not necessarily under the programmer's control.");
create_exception!(limbo, IntegrityError, DatabaseError, "Raised when the relational integrity of the database is affected, e.g., a foreign key check fails.");
create_exception!(limbo, InternalError, DatabaseError, "Raised when the database encounters an internal error, e.g., cursor is not valid anymore, transaction out of sync.");
create_exception!(limbo, ProgrammingError, DatabaseError, "Raised for programming errors, e.g., table not found, syntax error in SQL, wrong number of parameters specified.");
create_exception!(
    limbo,
    NotSupportedError,
    DatabaseError,
    "Raised when a method or database API is used which is not supported by the database."
);

</document_content>
</document>
<document index="14">
<source>./src/lib.rs</source>
<document_content>
use anyhow::Result;
use errors::*;
use limbo_core::IO;
use pyo3::prelude::*;
use pyo3::types::PyList;
use pyo3::types::PyTuple;
use std::rc::Rc;
use std::sync::{Arc, Mutex};

mod errors;

#[pyclass]
#[derive(Clone, Debug)]
struct Description {
    #[pyo3(get)]
    name: String,
    #[pyo3(get)]
    type_code: String,
    #[pyo3(get)]
    display_size: Option<String>,
    #[pyo3(get)]
    internal_size: Option<String>,
    #[pyo3(get)]
    precision: Option<String>,
    #[pyo3(get)]
    scale: Option<String>,
    #[pyo3(get)]
    null_ok: Option<String>,
}

impl IntoPy<Py<PyTuple>> for Description {
    fn into_py(self, py: Python<'_>) -> Py<PyTuple> {
        PyTuple::new_bound(
            py,
            vec![
                self.name.into_py(py),
                self.type_code.into_py(py),
                self.display_size.into_py(py),
                self.internal_size.into_py(py),
                self.precision.into_py(py),
                self.scale.into_py(py),
                self.null_ok.into_py(py),
            ],
        )
        .into()
    }
}

#[pyclass]
pub struct Cursor {
    /// This read/write attribute specifies the number of rows to fetch at a time with `.fetchmany()`.
    /// It defaults to `1`, meaning it fetches a single row at a time.
    #[pyo3(get)]
    arraysize: i64,

    conn: Connection,

    /// The `.description` attribute is a read-only sequence of 7-item, each describing a column in the result set:
    ///
    /// - `name`: The column's name (always present).
    /// - `type_code`: The data type code (always present).
    /// - `display_size`: Column's display size (optional).
    /// - `internal_size`: Column's internal size (optional).
    /// - `precision`: Numeric precision (optional).
    /// - `scale`: Numeric scale (optional).
    /// - `null_ok`: Indicates if null values are allowed (optional).
    ///
    /// The `name` and `type_code` fields are mandatory; others default to `None` if not applicable.
    ///
    /// This attribute is `None` for operations that do not return rows or if no `.execute*()` method has been invoked.
    #[pyo3(get)]
    description: Option<Description>,

    /// Read-only attribute that provides the number of modified rows for `INSERT`, `UPDATE`, `DELETE`,
    /// and `REPLACE` statements; it is `-1` for other statements, including CTE queries.
    /// It is only updated by the `execute()` and `executemany()` methods after the statement has run to completion.
    /// This means any resulting rows must be fetched for `rowcount` to be updated.
    #[pyo3(get)]
    rowcount: i64,

    smt: Option<Arc<Mutex<limbo_core::Statement>>>,
}

// SAFETY: The limbo_core crate guarantees that `Cursor` is thread-safe.
unsafe impl Send for Cursor {}

#[allow(unused_variables, clippy::arc_with_non_send_sync)]
#[pymethods]
impl Cursor {
    #[pyo3(signature = (sql, parameters=None))]
    pub fn execute(&mut self, sql: &str, parameters: Option<Py<PyTuple>>) -> Result<Self> {
        let stmt_is_dml = stmt_is_dml(sql);

        let conn_lock =
            self.conn.conn.lock().map_err(|_| {
                PyErr::new::<OperationalError, _>("Failed to acquire connection lock")
            })?;

        let statement = conn_lock.prepare(sql).map_err(|e| {
            PyErr::new::<ProgrammingError, _>(format!("Failed to prepare statement: {:?}", e))
        })?;

        self.smt = Some(Arc::new(Mutex::new(statement)));

        // TODO: use stmt_is_dml to set rowcount
        if stmt_is_dml {
            todo!()
        }

        Ok(Cursor {
            smt: self.smt.clone(),
            conn: self.conn.clone(),
            description: self.description.clone(),
            rowcount: self.rowcount,
            arraysize: self.arraysize,
        })
    }

    pub fn fetchone(&mut self, py: Python) -> Result<Option<PyObject>> {
        if let Some(smt) = &self.smt {
            let mut smt_lock = smt.lock().map_err(|_| {
                PyErr::new::<OperationalError, _>("Failed to acquire statement lock")
            })?;
            loop {
                match smt_lock.step().map_err(|e| {
                    PyErr::new::<OperationalError, _>(format!("Step error: {:?}", e))
                })? {
                    limbo_core::RowResult::Row(row) => {
                        let py_row = row_to_py(py, &row);
                        return Ok(Some(py_row));
                    }
                    limbo_core::RowResult::IO => {
                        self.conn.io.run_once().map_err(|e| {
                            PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
                        })?;
                    }
                    limbo_core::RowResult::Done => {
                        return Ok(None);
                    }
                }
            }
        } else {
            Err(PyErr::new::<ProgrammingError, _>("No statement prepared for execution").into())
        }
    }

    pub fn fetchall(&mut self, py: Python) -> Result<Vec<PyObject>> {
        let mut results = Vec::new();

        if let Some(smt) = &self.smt {
            let mut smt_lock = smt.lock().map_err(|_| {
                PyErr::new::<OperationalError, _>("Failed to acquire statement lock")
            })?;

            loop {
                match smt_lock.step().map_err(|e| {
                    PyErr::new::<OperationalError, _>(format!("Step error: {:?}", e))
                })? {
                    limbo_core::RowResult::Row(row) => {
                        let py_row = row_to_py(py, &row);
                        results.push(py_row);
                    }
                    limbo_core::RowResult::IO => {
                        self.conn.io.run_once().map_err(|e| {
                            PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
                        })?;
                    }
                    limbo_core::RowResult::Done => {
                        return Ok(results);
                    }
                }
            }
        } else {
            Err(PyErr::new::<ProgrammingError, _>("No statement prepared for execution").into())
        }
    }

    pub fn close(&self) -> Result<()> {
        todo!()
    }

    #[pyo3(signature = (sql, parameters=None))]
    pub fn executemany(&self, sql: &str, parameters: Option<Py<PyList>>) {
        todo!()
    }

    #[pyo3(signature = (size=None))]
    pub fn fetchmany(&self, size: Option<i64>) {
        todo!()
    }
}

fn stmt_is_dml(sql: &str) -> bool {
    let sql = sql.trim();
    let sql = sql.to_uppercase();
    sql.starts_with("INSERT") || sql.starts_with("UPDATE") || sql.starts_with("DELETE")
}

#[pyclass]
#[derive(Clone)]
pub struct Connection {
    conn: Arc<Mutex<Rc<limbo_core::Connection>>>,
    io: Arc<limbo_core::PlatformIO>,
}

// SAFETY: The limbo_core crate guarantees that `Connection` is thread-safe.
unsafe impl Send for Connection {}

#[pymethods]
impl Connection {
    pub fn cursor(&self) -> Result<Cursor> {
        Ok(Cursor {
            arraysize: 1,
            conn: self.clone(),
            description: None,
            rowcount: -1,
            smt: None,
        })
    }

    pub fn close(&self) {
        drop(self.conn.clone());
    }

    pub fn commit(&self) {
        todo!()
    }

    pub fn rollback(&self) {
        todo!()
    }
}

#[allow(clippy::arc_with_non_send_sync)]
#[pyfunction]
pub fn connect(path: &str) -> Result<Connection> {
    let io = Arc::new(limbo_core::PlatformIO::new().map_err(|e| {
        PyErr::new::<InterfaceError, _>(format!("IO initialization failed: {:?}", e))
    })?);
    let db = limbo_core::Database::open_file(io.clone(), path)
        .map_err(|e| PyErr::new::<DatabaseError, _>(format!("Failed to open database: {:?}", e)))?;
    let conn: Rc<limbo_core::Connection> = db.connect();
    Ok(Connection {
        conn: Arc::new(Mutex::new(conn)),
        io,
    })
}

fn row_to_py(py: Python, row: &limbo_core::Row) -> PyObject {
    let py_values: Vec<PyObject> = row
        .values
        .iter()
        .map(|value| match value {
            limbo_core::Value::Null => py.None(),
            limbo_core::Value::Integer(i) => i.to_object(py),
            limbo_core::Value::Float(f) => f.to_object(py),
            limbo_core::Value::Text(s) => s.to_object(py),
            limbo_core::Value::Blob(b) => b.to_object(py),
        })
        .collect();

    PyTuple::new_bound(py, &py_values).to_object(py)
}

#[pymodule]
fn _limbo(m: &Bound<PyModule>) -> PyResult<()> {
    m.add("__version__", env!("CARGO_PKG_VERSION"))?;
    m.add_class::<Connection>()?;
    m.add_class::<Cursor>()?;
    m.add_function(wrap_pyfunction!(connect, m)?)?;
    m.add("Warning", m.py().get_type_bound::<Warning>())?;
    m.add("Error", m.py().get_type_bound::<Error>())?;
    m.add("InterfaceError", m.py().get_type_bound::<InterfaceError>())?;
    m.add("DatabaseError", m.py().get_type_bound::<DatabaseError>())?;
    m.add("DataError", m.py().get_type_bound::<DataError>())?;
    m.add(
        "OperationalError",
        m.py().get_type_bound::<OperationalError>(),
    )?;
    m.add("IntegrityError", m.py().get_type_bound::<IntegrityError>())?;
    m.add("InternalError", m.py().get_type_bound::<InternalError>())?;
    m.add(
        "ProgrammingError",
        m.py().get_type_bound::<ProgrammingError>(),
    )?;
    m.add(
        "NotSupportedError",
        m.py().get_type_bound::<NotSupportedError>(),
    )?;
    Ok(())
}

</document_content>
</document>
</documents>

System:

write extensive usage documentation in markdown, including realistic usage examples

Response:

PyLimbo Usage Documentation

PyLimbo is a Python interface to Limbo, a work-in-progress, in-process OLTP database management system compatible with SQLite. It's designed to provide a Pythonic way to interact with Limbo databases, adhering closely to the Python DB-API 2.0 specification (PEP 249).

Installation

PyLimbo is built using Rust and requires a Rust toolchain to build from source. It is recommended to use maturin for building and distributing PyLimbo.

pip install maturin
maturin build --release
pip install <path-to-wheel-file>

Ensure you have Rust installed and configured correctly. You can install Rust via rustup:

curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

Basic Usage

Connecting to a Database

To connect to a Limbo database, use the connect() function, providing the path to the database file:

import limbo

# Connect to an existing database file
con = limbo.connect("mydatabase.db")

# Connect to an in-memory database
# con = limbo.connect(":memory:") # Not yet supported

Creating a Cursor

Once connected, you can create a cursor object to execute SQL commands:

cur = con.cursor()

Executing SQL Queries

Use the execute() method of the cursor to run SQL queries. Parameters can be passed to the query using standard Python DB-API parameter substitution:

# Create a table (example)
cur.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")

# Insert data
cur.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
cur.execute("INSERT INTO users (name) VALUES (?)", ("Bob",))

# Select data
cur.execute("SELECT * FROM users")
results = cur.fetchall()
for row in results:
    print(row)

Fetching Results

PyLimbo provides several methods for retrieving query results:

  • fetchone(): Fetches the next row from the result set.
  • fetchall(): Fetches all remaining rows.
  • fetchmany(size=n): Fetches the next n rows (defaults to arraysize if size is not provided).
cur.execute("SELECT id, name FROM users WHERE id = ?", (1,))
user = cur.fetchone()
print(f"User ID: {user[0]}, Name: {user[1]}")

cur.execute("SELECT name FROM users")
all_names = cur.fetchall()
for name in all_names:
    print(name[0])

Closing the Connection

When finished, close the cursor and the connection:

cur.close()
con.close()

Using Parameters with execute()

The second argument to execute() is an optional sequence of parameters that will be bound to the query:

cur.execute("SELECT * FROM users WHERE name = ?", ("Alice",))

Using executemany()

The executemany() method allows executing a command against a sequence of parameter sets:

data = [
    ("Carol",),
    ("Dave",),
]
cur.executemany("INSERT INTO users (name) VALUES (?)", data)

Using fetchmany()

fetchmany() retrieves the next set of rows, with the number of rows determined by the size argument or the cursor's arraysize attribute:

cur.arraysize = 2  # Set to fetch 2 rows at a time
cur.execute("SELECT * FROM users")
while True:
    rows = cur.fetchmany()
    if not rows:
        break
    for row in rows:
        print(row)

Error Handling

PyLimbo defines a set of exception classes as per the DB-API 2.0 specification:

  • Warning: For important warnings like data truncations.
  • Error: Base class for all database-related errors.
  • InterfaceError: For errors related to the database interface.
  • DatabaseError: For errors related to the database itself.
  • DataError: For errors in processed data (e.g., division by zero).
  • OperationalError: For errors in the database's operation.
  • IntegrityError: For relational integrity errors (e.g., foreign key violations).
  • InternalError: For internal database errors.
  • ProgrammingError: For programming errors (e.g., SQL syntax errors).
  • NotSupportedError: For using unsupported features.
try:
    cur.execute("SELECT * FROM non_existent_table")
except limbo.ProgrammingError as e:
    print(f"Programming error: {e}")

Advanced Usage

Connection Object

  • cursor(): Creates a new cursor object.
  • close(): Closes the connection.
  • commit(): Commits the current transaction (not yet fully implemented).
  • rollback(): Rolls back the current transaction (not yet fully implemented).

Cursor Object

  • arraysize: Specifies the number of rows to fetch with fetchmany().
  • description: Provides metadata about the columns in a result set. It's a sequence of 7-item sequences, each containing:
    • name
    • type_code
    • display_size
    • internal_size
    • precision
    • scale
    • null_ok
  • rowcount: The number of rows affected by the last execute() or executemany().
  • execute(sql, parameters=None): Executes a SQL query.
  • executemany(sql, seq_of_parameters): Executes a SQL command against a sequence of parameter sets.
  • fetchone(): Fetches the next row.
  • fetchall(): Fetches all remaining rows.
  • fetchmany(size=None): Fetches the next set of rows.
  • close(): Closes the cursor.

Example: Testing with pytest

PyLimbo can be used alongside sqlite3 in tests to ensure compatibility:

import sqlite3
import pytest
import limbo

@pytest.mark.parametrize("provider", ["sqlite3", "limbo"])
def test_fetchall_select_all_users(provider):
    conn = connect(provider, "tests/database.db")
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    users = cursor.fetchall()
    assert users == [(1, "alice"), (2, "bob")]

def connect(provider, database):
    if provider == "limbo":
        return limbo.connect(database)
    if provider == "sqlite3":
        return sqlite3.connect(database)
    raise Exception(f"Provider `{provider}` is not supported")

Development and Contributing

PyLimbo is developed in Rust, and contributions are welcome. The project uses maturin for building and ruff for linting.

Development Setup

  1. Install Rust and maturin.
  2. Clone the repository.
  3. Create a virtual environment: python3 -m venv .venv
  4. Activate the virtual environment:
    • Linux/macOS: source .venv/bin/activate
    • Windows: .venv\Scripts\activate
  5. Install development dependencies: pip install -r requirements-dev.txt

Running Tests

make test

Linting

make lint

Compiling Requirements

make compile-requirements

Building the Package

maturin build --release

Limitations

  • Limbo, and therefore PyLimbo, is still under development. Some features of SQLite may not be fully supported yet.
  • In-memory databases are not currently supported.
  • Transactions (commit and rollback) are not fully implemented.

License

PyLimbo is licensed under the MIT License. See the pyproject.toml file for details.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment