# From https://stackoverflow.com/a/68876046/1319998, which is itself inspired by https://stackoverflow.com/a/68814418/1319998 | |
from contextlib import contextmanager | |
from collections import namedtuple | |
from ctypes import cdll, byref, string_at, c_char_p, c_int, c_double, c_int64, c_void_p | |
from ctypes.util import find_library | |
from sys import platform | |
def query(db_file, sql, params=()): | |
libsqlite3 = cdll.LoadLibrary(find_library('sqlite3')) | |
libsqlite3.sqlite3_errstr.restype = c_char_p | |
libsqlite3.sqlite3_errmsg.restype = c_char_p | |
libsqlite3.sqlite3_column_name.restype = c_char_p | |
libsqlite3.sqlite3_column_double.restype = c_double | |
libsqlite3.sqlite3_column_int64.restype = c_int64 | |
libsqlite3.sqlite3_column_blob.restype = c_void_p | |
libsqlite3.sqlite3_column_bytes.restype = c_int64 | |
SQLITE_ROW = 100 | |
SQLITE_DONE = 101 | |
SQLITE_TRANSIENT = -1 | |
SQLITE_OPEN_READWRITE = 0x00000002 | |
bind = { | |
type(0): libsqlite3.sqlite3_bind_int64, | |
type(0.0): libsqlite3.sqlite3_bind_double, | |
type(''): lambda pp_stmt, i, value: libsqlite3.sqlite3_bind_text(pp_stmt, i, value.encode('utf-8'), len(value.encode('utf-8')), SQLITE_TRANSIENT), | |
type(b''): lambda pp_stmt, i, value: libsqlite3.sqlite3_bind_blob(pp_stmt, i, value, len(value), SQLITE_TRANSIENT), | |
type(None): lambda pp_stmt, i, _: libsqlite3.sqlite3_bind_null(pp_stmt, i), | |
} | |
extract = { | |
1: libsqlite3.sqlite3_column_int64, | |
2: libsqlite3.sqlite3_column_double, | |
3: lambda pp_stmt, i: string_at( | |
libsqlite3.sqlite3_column_blob(pp_stmt, i), | |
libsqlite3.sqlite3_column_bytes(pp_stmt, i), | |
).decode(), | |
4: lambda pp_stmt, i: string_at( | |
libsqlite3.sqlite3_column_blob(pp_stmt, i), | |
libsqlite3.sqlite3_column_bytes(pp_stmt, i), | |
), | |
5: lambda pp_stmt, i: None, | |
} | |
def run(func, *args): | |
res = func(*args) | |
if res != 0: | |
raise Exception(libsqlite3.sqlite3_errstr(res).decode()) | |
def run_with_db(db, func, *args): | |
if func(*args) != 0: | |
raise Exception(libsqlite3.sqlite3_errmsg(db).decode()) | |
@contextmanager | |
def get_db(db_file): | |
db = c_void_p() | |
run(libsqlite3.sqlite3_open_v2, db_file.encode(), byref(db), SQLITE_OPEN_READWRITE, None) | |
try: | |
yield db | |
finally: | |
run_with_db(db, libsqlite3.sqlite3_close, db) | |
@contextmanager | |
def get_pp_stmt(db, sql): | |
pp_stmt = c_void_p() | |
run_with_db(db, libsqlite3.sqlite3_prepare_v3, db, sql.encode(), -1, 0, byref(pp_stmt), None) | |
try: | |
yield pp_stmt | |
finally: | |
run_with_db(db, libsqlite3.sqlite3_finalize, pp_stmt) | |
with \ | |
get_db(db_file) as db, \ | |
get_pp_stmt(db, sql) as pp_stmt: | |
for i, param in enumerate(params): | |
run_with_db(db, bind[type(param)], pp_stmt, i + 1, param) | |
row_constructor = namedtuple('Row', ( | |
libsqlite3.sqlite3_column_name(pp_stmt, i).decode() | |
for i in range(0, libsqlite3.sqlite3_column_count(pp_stmt)) | |
)) | |
while True: | |
res = libsqlite3.sqlite3_step(pp_stmt) | |
if res == SQLITE_DONE: | |
break | |
if res != SQLITE_ROW: | |
raise Exception(libsqlite3.sqlite3_errstr(res).decode()) | |
yield row_constructor(*( | |
extract[libsqlite3.sqlite3_column_type(pp_stmt, i)](pp_stmt, i) | |
for i in range(0, len(row_constructor._fields)) | |
)) |
This was super helpful....thanks! I used this code in a project that allows you to adjust the date/time/timezone of photos in Apple Photos app by directly modifying the Photos database. This is impossible with python's builtin sqlite3 due to persistent locks, trigger madness, etc. But it worked beautifully with your code!
@RhetTbull Pleased it’s helpful!
(I’m curious though, do you have more details on the locking/trigger issues it solves? I didn’t write it with any such issues in mind…)
(I’m curious though, do you have more details on the locking/trigger issues it solves? I didn’t write it with any such issues in mind…)
So I was wrong about the trigger issue (those are still there, just hadn't run into them yet). Because the database I'm editing is actually an Apple CoreData database, it has a triggers which call code in the parent app which obviously fail when triggered outside the app. That still happens with your implementation.
The locking issue is that Apple Photos has a background process photolibraryd
that maintains a lock on the database (Photos does background processing on the photos even if the app isn't closed, for example, to identify faces and other objects in the photos). Even in read-only mode, the Python sqlite3 package sometimes refuses to open the database due to the existing lock. The lock isn't exclusive and I could access the database directly from the command line using sqlite3
so I knew this wasn't a sqlite issue but a python issue. I was solving it by copying the database first which caused other issues. Using your code solved this and I'm now able to write to the database even while Photos is using it. (Whether or not this is wise is another story...)
@RhetTbull Thanks for the info...
(Whether or not this is wise is another story...)
I guess I don't know! I would have thought that the code here would respect locks just as Python's sqlite3 module would, so I think something's happening that I don't quite understand...
Thank you! This is informative. I'm looking for a way to implement table-valued functions without C (or Cython) modules, and this might be the ticket...
Usage example: