Model: gemini-exp-1206
file-to-prompt limbo/bindings/python -c
<documents>
<document index="1">
<source>./Cargo.toml</source>
<document_content>
[package]
name = "py-limbo"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
repository.workspace = true
[lib]
name = "_limbo"
crate-type = ["cdylib"]
[features]
# must be enabled when building with `cargo build`, maturin enables this automatically
extension-module = ["pyo3/extension-module"]
[dependencies]
anyhow = "1.0"
limbo_core = { path = "../../core" }
pyo3 = { version = "0.22.4", features = ["anyhow"] }
[build-dependencies]
version_check = "0.9.5"
# used where logic has to be version/distribution specific, e.g. pypy
pyo3-build-config = { version = "0.22.4" }
</document_content>
</document>
<document index="2">
<source>./Makefile</source>
<document_content>
REQUIREMENTS := requirements.txt
REQUIREMENTS_DEV := requirements-dev.txt
all: check-requirements install lint test
.PHONY: all
install:
@echo "Installing requirements..."
pip install -r requirements.txt -r requirements-dev.txt
.PHONY: install
test:
@echo "Running tests..."
pytest
.PHONY: test
lint:
@echo "Running linters..."
ruff check
ruff format --diff
.PHONY: lint
check-requirements:
@echo "Checking requirements files..."
mkdir -p .tmp
pip-compile pyproject.toml --quiet --output-file=.tmp/$(REQUIREMENTS)
pip-compile pyproject.toml --quiet --extra=dev --output-file=.tmp/$(REQUIREMENTS_DEV)
diff -u $(REQUIREMENTS) .tmp/$(REQUIREMENTS) || (echo "$(REQUIREMENTS) doesn't match pyproject.toml" && exit 1)
diff -u $(REQUIREMENTS_DEV) .tmp/$(REQUIREMENTS_DEV) || (echo "$(REQUIREMENTS_DEV) doesn't match pyproject.toml" && exit 1)
@echo "Requirements files match pyproject.toml"
.PHONY: check-requirements
compile-requirements:
@echo "Compiling requirements files..."
pip-compile pyproject.toml --output-file=$(REQUIREMENTS)
pip-compile pyproject.toml --extra=dev --output-file=$(REQUIREMENTS_DEV)
.PHONY: compile-requirements
</document_content>
</document>
<document index="3">
<source>./build.rs</source>
<document_content>
fn main() {
pyo3_build_config::use_pyo3_cfgs();
println!("cargo::rustc-check-cfg=cfg(allocator, values(\"default\", \"mimalloc\"))");
}
</document_content>
</document>
<document index="4">
<source>./example.py</source>
<document_content>
import limbo
con = limbo.connect("sqlite.db")
cur = con.cursor()
res = cur.execute("SELECT * FROM users")
print(res.fetchone())
</document_content>
</document>
<document index="5">
<source>./pyproject.toml</source>
<document_content>
[build-system]
requires = ['maturin>=1,<2', 'typing_extensions']
build-backend = 'maturin'
[project]
name = 'pylimbo'
description = "Limbo is a work-in-progress, in-process OLTP database management system, compatible with SQLite."
requires-python = '>=3.9'
classifiers = [
'Development Status :: 3 - Alpha',
'Programming Language :: Python',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Programming Language :: Python :: 3.12',
'Programming Language :: Python :: 3.13',
'Programming Language :: Rust',
'License :: OSI Approved :: MIT License',
'Operating System :: POSIX :: Linux',
'Operating System :: Microsoft :: Windows',
'Operating System :: MacOS',
'Topic :: Database',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules',
'Topic :: Database :: Database Engines/Servers',
]
dependencies = ['typing-extensions >=4.6.0,!=4.7.0']
dynamic = ['readme', 'version']
[project.optional-dependencies]
dev = ["mypy==1.11.0", "pytest==8.3.1", "pytest-cov==5.0.0", "ruff==0.5.4", "coverage==7.6.1"]
[project.urls]
Homepage = "https://github.com/penberg/limbo"
Source = "https://github.com/penberg/limbo"
[tool.maturin]
bindings = 'pyo3'
module-name = "limbo._limbo"
features = ["pyo3/extension-module"]
[tool.pip-tools]
strip-extras = true
header = false
upgrade = false
[tool.ruff]
line-length = 120
[tool.ruff.lint]
extend-select = [
"E", # pycodestyle errors
"W", # pycodestyle warings
"F", # pyflakes
'Q', # flake8-quotes
'C90', # mccabe
'I', # isort
]
[tool.pytest.ini_options]
testpaths = 'tests'
log_format = '%(name)s %(levelname)s: %(message)s'
[tool.coverage.run]
source = ['limbo']
branch = true
[tool.coverage.report]
precision = 2
exclude_lines = [
'pragma: no cover',
'raise NotImplementedError',
'if TYPE_CHECKING:',
'@overload',
]
</document_content>
</document>
<document index="6">
<source>./requirements-dev.txt</source>
<document_content>
coverage==7.6.1
# via
# limbo (pyproject.toml)
# pytest-cov
iniconfig==2.0.0
# via pytest
mypy==1.11.0
# via limbo (pyproject.toml)
mypy-extensions==1.0.0
# via mypy
packaging==24.2
# via pytest
pluggy==1.5.0
# via pytest
pytest==8.3.1
# via
# limbo (pyproject.toml)
# pytest-cov
pytest-cov==5.0.0
# via limbo (pyproject.toml)
ruff==0.5.4
# via limbo (pyproject.toml)
typing-extensions==4.12.2
# via
# limbo (pyproject.toml)
# mypy
</document_content>
</document>
<document index="7">
<source>./requirements.txt</source>
<document_content>
typing-extensions==4.12.2
# via limbo (pyproject.toml)
</document_content>
</document>
<document index="8">
<source>./tests/__init__.py</source>
<document_content>
</document_content>
</document>
<document index="9">
<source>./tests/test_database.py</source>
<document_content>
import sqlite3
import pytest
import limbo
@pytest.mark.parametrize("provider", ["sqlite3", "limbo"])
def test_fetchall_select_all_users(provider):
conn = connect(provider, "tests/database.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
assert users
assert users == [(1, "alice"), (2, "bob")]
@pytest.mark.parametrize("provider", ["sqlite3", "limbo"])
def test_fetchall_select_user_ids(provider):
conn = connect(provider, "tests/database.db")
cursor = conn.cursor()
cursor.execute("SELECT id FROM users")
user_ids = cursor.fetchall()
assert user_ids
assert user_ids == [(1,), (2,)]
@pytest.mark.parametrize("provider", ["sqlite3", "limbo"])
def test_fetchone_select_all_users(provider):
conn = connect(provider, "tests/database.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
alice = cursor.fetchone()
assert alice
assert alice == (1, "alice")
bob = cursor.fetchone()
assert bob
assert bob == (2, "bob")
@pytest.mark.parametrize("provider", ["sqlite3", "limbo"])
def test_fetchone_select_max_user_id(provider):
conn = connect(provider, "tests/database.db")
cursor = conn.cursor()
cursor.execute("SELECT MAX(id) FROM users")
max_id = cursor.fetchone()
assert max_id
assert max_id == (2,)
def connect(provider, database):
if provider == "limbo":
return limbo.connect(database)
if provider == "sqlite3":
return sqlite3.connect(database)
raise Exception(f"Provider `{provider}` is not supported")
</document_content>
</document>
<document index="10">
<source>./limbo/__init__.py</source>
<document_content>
from ._limbo import (
Connection,
Cursor,
DatabaseError,
DataError,
IntegrityError,
InterfaceError,
InternalError,
NotSupportedError,
OperationalError,
ProgrammingError,
__version__,
connect,
)
__all__ = [
"__version__",
"Connection",
"Cursor",
"InterfaceError",
"DatabaseError",
"DataError",
"OperationalError",
"IntegrityError",
"InternalError",
"ProgrammingError",
"NotSupportedError",
"connect",
]
</document_content>
</document>
<document index="11">
<source>./limbo/_limbo.pyi</source>
<document_content>
from typing import Any, List, Optional, Tuple
__version__: str
class Connection:
def cursor(self) -> "Cursor":
"""
Creates a new cursor object using this connection.
:return: A new Cursor object.
:raises InterfaceError: If the cursor cannot be created.
"""
...
def close(self) -> None:
"""
Closes the connection to the database.
:raises OperationalError: If there is an error closing the connection.
"""
...
def commit(self) -> None:
"""
Commits the current transaction.
:raises OperationalError: If there is an error during commit.
"""
...
def rollback(self) -> None:
"""
Rolls back the current transaction.
:raises OperationalError: If there is an error during rollback.
"""
...
class Cursor:
arraysize: int
description: Optional[
Tuple[
str,
str,
Optional[str],
Optional[str],
Optional[str],
Optional[str],
Optional[str],
]
]
rowcount: int
def execute(self, sql: str, parameters: Optional[Tuple[Any, ...]] = None) -> "Cursor":
"""
Prepares and executes a SQL statement using the connection.
:param sql: The SQL query to execute.
:param parameters: The parameters to substitute into the SQL query.
:raises ProgrammingError: If there is an error in the SQL query.
:raises OperationalError: If there is an error executing the query.
:return: The cursor object.
"""
...
def executemany(self, sql: str, parameters: Optional[List[Tuple[Any, ...]]] = None) -> None:
"""
Executes a SQL command against all parameter sequences or mappings found in the sequence `parameters`.
:param sql: The SQL command to execute.
:param parameters: A list of parameter sequences or mappings.
:raises ProgrammingError: If there is an error in the SQL query.
:raises OperationalError: If there is an error executing the query.
"""
...
def fetchone(self) -> Optional[Tuple[Any, ...]]:
"""
Fetches the next row from the result set.
:return: A tuple representing the next row, or None if no more rows are available.
:raises OperationalError: If there is an error fetching the row.
"""
...
def fetchall(self) -> List[Tuple[Any, ...]]:
"""
Fetches all remaining rows from the result set.
:return: A list of tuples, each representing a row in the result set.
:raises OperationalError: If there is an error fetching the rows.
"""
...
def fetchmany(self, size: Optional[int] = None) -> List[Tuple[Any, ...]]:
"""
Fetches the next set of rows of a size specified by the `arraysize` property.
:param size: Optional integer to specify the number of rows to fetch.
:return: A list of tuples, each representing a row in the result set.
:raises OperationalError: If there is an error fetching the rows.
"""
...
def close(self) -> None:
"""
Closes the cursor.
:raises OperationalError: If there is an error closing the cursor.
"""
...
# Exception classes
class Warning(Exception):
"""Exception raised for important warnings like data truncations while inserting."""
...
class Error(Exception):
"""Base class for all other error exceptions. Catch all database-related errors using this class."""
...
class InterfaceError(Error):
"""Exception raised for errors related to the database interface rather than the database itself."""
...
class DatabaseError(Error):
"""Exception raised for errors that are related to the database."""
...
class DataError(DatabaseError):
"""
Exception raised for errors due to problems with the processed data like division by zero, numeric value out of
range, etc.
"""
...
class OperationalError(DatabaseError):
"""
Exception raised for errors related to the database’s operation, not necessarily under the programmer's control.
"""
...
class IntegrityError(DatabaseError):
"""Exception raised when the relational integrity of the database is affected, e.g., a foreign key check fails."""
...
class InternalError(DatabaseError):
"""
Exception raised when the database encounters an internal error, e.g., cursor is not valid anymore, transaction out
of sync.
"""
...
class ProgrammingError(DatabaseError):
"""
Exception raised for programming errors, e.g., table not found, syntax error in SQL, wrong number of parameters
specified.
"""
...
class NotSupportedError(DatabaseError):
"""Exception raised when a method or database API is used which is not supported by the database."""
...
def connect(path: str) -> Connection:
"""
Connects to a database at the specified path.
:param path: The path to the database file.
:return: A Connection object to the database.
:raises InterfaceError: If the database cannot be connected.
"""
...
</document_content>
</document>
<document index="12">
<source>./limbo/py.typed</source>
<document_content>
</document_content>
</document>
<document index="13">
<source>./src/errors.rs</source>
<document_content>
use pyo3::create_exception;
use pyo3::exceptions::PyException;
create_exception!(
limbo,
Warning,
PyException,
"Exception raised for important warnings like data truncations while inserting."
);
create_exception!(limbo, Error, PyException, "Base class for all other error exceptions. Catch all database-related errors using this class.");
create_exception!(
limbo,
InterfaceError,
Error,
"Raised for errors related to the database interface rather than the database itself."
);
create_exception!(
limbo,
DatabaseError,
Error,
"Raised for errors that are related to the database."
);
create_exception!(limbo, DataError, DatabaseError, "Raised for errors due to problems with the processed data like division by zero, numeric value out of range, etc.");
create_exception!(limbo, OperationalError, DatabaseError, "Raised for errors related to the database’s operation, not necessarily under the programmer's control.");
create_exception!(limbo, IntegrityError, DatabaseError, "Raised when the relational integrity of the database is affected, e.g., a foreign key check fails.");
create_exception!(limbo, InternalError, DatabaseError, "Raised when the database encounters an internal error, e.g., cursor is not valid anymore, transaction out of sync.");
create_exception!(limbo, ProgrammingError, DatabaseError, "Raised for programming errors, e.g., table not found, syntax error in SQL, wrong number of parameters specified.");
create_exception!(
limbo,
NotSupportedError,
DatabaseError,
"Raised when a method or database API is used which is not supported by the database."
);
</document_content>
</document>
<document index="14">
<source>./src/lib.rs</source>
<document_content>
use anyhow::Result;
use errors::*;
use limbo_core::IO;
use pyo3::prelude::*;
use pyo3::types::PyList;
use pyo3::types::PyTuple;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
mod errors;
#[pyclass]
#[derive(Clone, Debug)]
struct Description {
#[pyo3(get)]
name: String,
#[pyo3(get)]
type_code: String,
#[pyo3(get)]
display_size: Option<String>,
#[pyo3(get)]
internal_size: Option<String>,
#[pyo3(get)]
precision: Option<String>,
#[pyo3(get)]
scale: Option<String>,
#[pyo3(get)]
null_ok: Option<String>,
}
impl IntoPy<Py<PyTuple>> for Description {
fn into_py(self, py: Python<'_>) -> Py<PyTuple> {
PyTuple::new_bound(
py,
vec![
self.name.into_py(py),
self.type_code.into_py(py),
self.display_size.into_py(py),
self.internal_size.into_py(py),
self.precision.into_py(py),
self.scale.into_py(py),
self.null_ok.into_py(py),
],
)
.into()
}
}
#[pyclass]
pub struct Cursor {
/// This read/write attribute specifies the number of rows to fetch at a time with `.fetchmany()`.
/// It defaults to `1`, meaning it fetches a single row at a time.
#[pyo3(get)]
arraysize: i64,
conn: Connection,
/// The `.description` attribute is a read-only sequence of 7-item, each describing a column in the result set:
///
/// - `name`: The column's name (always present).
/// - `type_code`: The data type code (always present).
/// - `display_size`: Column's display size (optional).
/// - `internal_size`: Column's internal size (optional).
/// - `precision`: Numeric precision (optional).
/// - `scale`: Numeric scale (optional).
/// - `null_ok`: Indicates if null values are allowed (optional).
///
/// The `name` and `type_code` fields are mandatory; others default to `None` if not applicable.
///
/// This attribute is `None` for operations that do not return rows or if no `.execute*()` method has been invoked.
#[pyo3(get)]
description: Option<Description>,
/// Read-only attribute that provides the number of modified rows for `INSERT`, `UPDATE`, `DELETE`,
/// and `REPLACE` statements; it is `-1` for other statements, including CTE queries.
/// It is only updated by the `execute()` and `executemany()` methods after the statement has run to completion.
/// This means any resulting rows must be fetched for `rowcount` to be updated.
#[pyo3(get)]
rowcount: i64,
smt: Option<Arc<Mutex<limbo_core::Statement>>>,
}
// SAFETY: The limbo_core crate guarantees that `Cursor` is thread-safe.
unsafe impl Send for Cursor {}
#[allow(unused_variables, clippy::arc_with_non_send_sync)]
#[pymethods]
impl Cursor {
#[pyo3(signature = (sql, parameters=None))]
pub fn execute(&mut self, sql: &str, parameters: Option<Py<PyTuple>>) -> Result<Self> {
let stmt_is_dml = stmt_is_dml(sql);
let conn_lock =
self.conn.conn.lock().map_err(|_| {
PyErr::new::<OperationalError, _>("Failed to acquire connection lock")
})?;
let statement = conn_lock.prepare(sql).map_err(|e| {
PyErr::new::<ProgrammingError, _>(format!("Failed to prepare statement: {:?}", e))
})?;
self.smt = Some(Arc::new(Mutex::new(statement)));
// TODO: use stmt_is_dml to set rowcount
if stmt_is_dml {
todo!()
}
Ok(Cursor {
smt: self.smt.clone(),
conn: self.conn.clone(),
description: self.description.clone(),
rowcount: self.rowcount,
arraysize: self.arraysize,
})
}
pub fn fetchone(&mut self, py: Python) -> Result<Option<PyObject>> {
if let Some(smt) = &self.smt {
let mut smt_lock = smt.lock().map_err(|_| {
PyErr::new::<OperationalError, _>("Failed to acquire statement lock")
})?;
loop {
match smt_lock.step().map_err(|e| {
PyErr::new::<OperationalError, _>(format!("Step error: {:?}", e))
})? {
limbo_core::RowResult::Row(row) => {
let py_row = row_to_py(py, &row);
return Ok(Some(py_row));
}
limbo_core::RowResult::IO => {
self.conn.io.run_once().map_err(|e| {
PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
})?;
}
limbo_core::RowResult::Done => {
return Ok(None);
}
}
}
} else {
Err(PyErr::new::<ProgrammingError, _>("No statement prepared for execution").into())
}
}
pub fn fetchall(&mut self, py: Python) -> Result<Vec<PyObject>> {
let mut results = Vec::new();
if let Some(smt) = &self.smt {
let mut smt_lock = smt.lock().map_err(|_| {
PyErr::new::<OperationalError, _>("Failed to acquire statement lock")
})?;
loop {
match smt_lock.step().map_err(|e| {
PyErr::new::<OperationalError, _>(format!("Step error: {:?}", e))
})? {
limbo_core::RowResult::Row(row) => {
let py_row = row_to_py(py, &row);
results.push(py_row);
}
limbo_core::RowResult::IO => {
self.conn.io.run_once().map_err(|e| {
PyErr::new::<OperationalError, _>(format!("IO error: {:?}", e))
})?;
}
limbo_core::RowResult::Done => {
return Ok(results);
}
}
}
} else {
Err(PyErr::new::<ProgrammingError, _>("No statement prepared for execution").into())
}
}
pub fn close(&self) -> Result<()> {
todo!()
}
#[pyo3(signature = (sql, parameters=None))]
pub fn executemany(&self, sql: &str, parameters: Option<Py<PyList>>) {
todo!()
}
#[pyo3(signature = (size=None))]
pub fn fetchmany(&self, size: Option<i64>) {
todo!()
}
}
fn stmt_is_dml(sql: &str) -> bool {
let sql = sql.trim();
let sql = sql.to_uppercase();
sql.starts_with("INSERT") || sql.starts_with("UPDATE") || sql.starts_with("DELETE")
}
#[pyclass]
#[derive(Clone)]
pub struct Connection {
conn: Arc<Mutex<Rc<limbo_core::Connection>>>,
io: Arc<limbo_core::PlatformIO>,
}
// SAFETY: The limbo_core crate guarantees that `Connection` is thread-safe.
unsafe impl Send for Connection {}
#[pymethods]
impl Connection {
pub fn cursor(&self) -> Result<Cursor> {
Ok(Cursor {
arraysize: 1,
conn: self.clone(),
description: None,
rowcount: -1,
smt: None,
})
}
pub fn close(&self) {
drop(self.conn.clone());
}
pub fn commit(&self) {
todo!()
}
pub fn rollback(&self) {
todo!()
}
}
#[allow(clippy::arc_with_non_send_sync)]
#[pyfunction]
pub fn connect(path: &str) -> Result<Connection> {
let io = Arc::new(limbo_core::PlatformIO::new().map_err(|e| {
PyErr::new::<InterfaceError, _>(format!("IO initialization failed: {:?}", e))
})?);
let db = limbo_core::Database::open_file(io.clone(), path)
.map_err(|e| PyErr::new::<DatabaseError, _>(format!("Failed to open database: {:?}", e)))?;
let conn: Rc<limbo_core::Connection> = db.connect();
Ok(Connection {
conn: Arc::new(Mutex::new(conn)),
io,
})
}
fn row_to_py(py: Python, row: &limbo_core::Row) -> PyObject {
let py_values: Vec<PyObject> = row
.values
.iter()
.map(|value| match value {
limbo_core::Value::Null => py.None(),
limbo_core::Value::Integer(i) => i.to_object(py),
limbo_core::Value::Float(f) => f.to_object(py),
limbo_core::Value::Text(s) => s.to_object(py),
limbo_core::Value::Blob(b) => b.to_object(py),
})
.collect();
PyTuple::new_bound(py, &py_values).to_object(py)
}
#[pymodule]
fn _limbo(m: &Bound<PyModule>) -> PyResult<()> {
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add_class::<Connection>()?;
m.add_class::<Cursor>()?;
m.add_function(wrap_pyfunction!(connect, m)?)?;
m.add("Warning", m.py().get_type_bound::<Warning>())?;
m.add("Error", m.py().get_type_bound::<Error>())?;
m.add("InterfaceError", m.py().get_type_bound::<InterfaceError>())?;
m.add("DatabaseError", m.py().get_type_bound::<DatabaseError>())?;
m.add("DataError", m.py().get_type_bound::<DataError>())?;
m.add(
"OperationalError",
m.py().get_type_bound::<OperationalError>(),
)?;
m.add("IntegrityError", m.py().get_type_bound::<IntegrityError>())?;
m.add("InternalError", m.py().get_type_bound::<InternalError>())?;
m.add(
"ProgrammingError",
m.py().get_type_bound::<ProgrammingError>(),
)?;
m.add(
"NotSupportedError",
m.py().get_type_bound::<NotSupportedError>(),
)?;
Ok(())
}
</document_content>
</document>
</documents>
write extensive usage documentation in markdown, including realistic usage examples
PyLimbo is a Python interface to Limbo, a work-in-progress, in-process OLTP database management system compatible with SQLite. It's designed to provide a Pythonic way to interact with Limbo databases, adhering closely to the Python DB-API 2.0 specification (PEP 249).
PyLimbo is built using Rust and requires a Rust toolchain to build from source. It is recommended to use maturin
for building and distributing PyLimbo.
pip install maturin
maturin build --release
pip install <path-to-wheel-file>
Ensure you have Rust installed and configured correctly. You can install Rust via rustup
:
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
To connect to a Limbo database, use the connect()
function, providing the path to the database file:
import limbo
# Connect to an existing database file
con = limbo.connect("mydatabase.db")
# Connect to an in-memory database
# con = limbo.connect(":memory:") # Not yet supported
Once connected, you can create a cursor object to execute SQL commands:
cur = con.cursor()
Use the execute()
method of the cursor to run SQL queries. Parameters can be passed to the query using standard Python DB-API parameter substitution:
# Create a table (example)
cur.execute("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)")
# Insert data
cur.execute("INSERT INTO users (name) VALUES (?)", ("Alice",))
cur.execute("INSERT INTO users (name) VALUES (?)", ("Bob",))
# Select data
cur.execute("SELECT * FROM users")
results = cur.fetchall()
for row in results:
print(row)
PyLimbo provides several methods for retrieving query results:
fetchone()
: Fetches the next row from the result set.fetchall()
: Fetches all remaining rows.fetchmany(size=n)
: Fetches the nextn
rows (defaults toarraysize
ifsize
is not provided).
cur.execute("SELECT id, name FROM users WHERE id = ?", (1,))
user = cur.fetchone()
print(f"User ID: {user[0]}, Name: {user[1]}")
cur.execute("SELECT name FROM users")
all_names = cur.fetchall()
for name in all_names:
print(name[0])
When finished, close the cursor and the connection:
cur.close()
con.close()
The second argument to execute()
is an optional sequence of parameters that will be bound to the query:
cur.execute("SELECT * FROM users WHERE name = ?", ("Alice",))
The executemany()
method allows executing a command against a sequence of parameter sets:
data = [
("Carol",),
("Dave",),
]
cur.executemany("INSERT INTO users (name) VALUES (?)", data)
fetchmany()
retrieves the next set of rows, with the number of rows determined by the size
argument or the cursor's arraysize
attribute:
cur.arraysize = 2 # Set to fetch 2 rows at a time
cur.execute("SELECT * FROM users")
while True:
rows = cur.fetchmany()
if not rows:
break
for row in rows:
print(row)
PyLimbo defines a set of exception classes as per the DB-API 2.0 specification:
Warning
: For important warnings like data truncations.Error
: Base class for all database-related errors.InterfaceError
: For errors related to the database interface.DatabaseError
: For errors related to the database itself.DataError
: For errors in processed data (e.g., division by zero).OperationalError
: For errors in the database's operation.IntegrityError
: For relational integrity errors (e.g., foreign key violations).InternalError
: For internal database errors.ProgrammingError
: For programming errors (e.g., SQL syntax errors).NotSupportedError
: For using unsupported features.
try:
cur.execute("SELECT * FROM non_existent_table")
except limbo.ProgrammingError as e:
print(f"Programming error: {e}")
cursor()
: Creates a new cursor object.close()
: Closes the connection.commit()
: Commits the current transaction (not yet fully implemented).rollback()
: Rolls back the current transaction (not yet fully implemented).
arraysize
: Specifies the number of rows to fetch withfetchmany()
.description
: Provides metadata about the columns in a result set. It's a sequence of 7-item sequences, each containing:name
type_code
display_size
internal_size
precision
scale
null_ok
rowcount
: The number of rows affected by the lastexecute()
orexecutemany()
.execute(sql, parameters=None)
: Executes a SQL query.executemany(sql, seq_of_parameters)
: Executes a SQL command against a sequence of parameter sets.fetchone()
: Fetches the next row.fetchall()
: Fetches all remaining rows.fetchmany(size=None)
: Fetches the next set of rows.close()
: Closes the cursor.
PyLimbo can be used alongside sqlite3
in tests to ensure compatibility:
import sqlite3
import pytest
import limbo
@pytest.mark.parametrize("provider", ["sqlite3", "limbo"])
def test_fetchall_select_all_users(provider):
conn = connect(provider, "tests/database.db")
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
users = cursor.fetchall()
assert users == [(1, "alice"), (2, "bob")]
def connect(provider, database):
if provider == "limbo":
return limbo.connect(database)
if provider == "sqlite3":
return sqlite3.connect(database)
raise Exception(f"Provider `{provider}` is not supported")
PyLimbo is developed in Rust, and contributions are welcome. The project uses maturin
for building and ruff
for linting.
- Install Rust and
maturin
. - Clone the repository.
- Create a virtual environment:
python3 -m venv .venv
- Activate the virtual environment:
- Linux/macOS:
source .venv/bin/activate
- Windows:
.venv\Scripts\activate
- Linux/macOS:
- Install development dependencies:
pip install -r requirements-dev.txt
make test
make lint
make compile-requirements
maturin build --release
- Limbo, and therefore PyLimbo, is still under development. Some features of SQLite may not be fully supported yet.
- In-memory databases are not currently supported.
- Transactions (commit and rollback) are not fully implemented.
PyLimbo is licensed under the MIT License. See the pyproject.toml
file for details.