A function for passing connection parameters to SQLite in the context of SQLAlchemy. See http://stronglyemergent.com/blog/2016/how-to-make-sqlalchemy-pass-connection-flags-to-sqlite/ for explanation.
import os.path | |
import re | |
import sqlite3 | |
from sqlalchemy import create_engine | |
def create_sqlite_engine(sqlite_path, | |
read_only=None, create_db=None, **kwargs): | |
""" | |
Open SQLite with appropriate use of connection flags. | |
This is harder than it sounds. | |
""" | |
# Requirements: sqlite_path must be either ':memory:' or the output of | |
# os.path.abspath(). In this context, the SQLitePathType class handles that | |
# for us. | |
log.info("Trying to connect to SQLite ({!r})".format(sqlite_path)) | |
sqlite_prefix = 'sqlite:///' | |
# For an in-memory DB, ignore the flags | |
if sqlite_path == ':memory:': | |
log.info("Fallback: :memory: DB.") | |
return sqlalchemy.engine.create_engine(sqlite_prefix) | |
# For atrociously old versions of SQLite, ignore the flags | |
if (sqlite3.sqlite_version_info < (3, 7, 7)): | |
log.warning(("Found old SQLite version: " | |
"using it, but have to ignore {!r}").format(kwargs)) | |
return sqlalchemy.engine.create_engine(sqlite_prefix + sqlite_path) | |
kwargs.pop('mode', None) | |
if create_db: | |
kwargs['mode'] = 'rwc' | |
else: | |
kwargs['mode'] = 'rw' | |
# Because the read_only and create_db modes are mutually exclusive, we give | |
# priority to the more restrictive mode. | |
if read_only: | |
kwargs['mode'] = 'ro' | |
# Normalize sqlite_path from a platform-bound file path to a | |
# platform-agnostic file:// URI (see https://www.sqlite.org/uri.html for | |
# details of SQLite's file:// URI requirements). | |
sqlite_path = re.sub(r'\?', r'%3f', sqlite_path) | |
sqlite_path = re.sub('#', r'%23', sqlite_path) | |
drive, sqlite_path = os.path.splitdrive(sqlite_path) | |
path_segments = [] | |
path_head, path_tail = os.path.split(sqlite_path) | |
while path_tail != '': | |
path_segments = [path_tail] + path_segments | |
path_head, path_tail = os.path.split(path_head) | |
if drive != '': | |
path_segments = [drive] + path_segments | |
try: | |
# Python 3 | |
query_string = urlparse.urlencode(kwargs) | |
except AttributeError: | |
# Python 2 | |
query_string = urllib.urlencode(kwargs) | |
file_uri = urlparse.urlunparse( | |
("file", # Scheme | |
'', # Netloc (localhost) | |
'/'.join(path_segments), # Path | |
"", # Params | |
query_string, # Query | |
"", # Fragment | |
)) | |
log.debug("Database URI: {!r}".format(file_uri)) | |
def _sqlite_conn_creator(): | |
try: | |
log.debug("Trying the easy way...") | |
# In Python 3.4+, we can do this the easy way. | |
conn = sqlite3.connect(file_uri, uri=True) | |
except TypeError: | |
log.debug("Trying ctypes hackage...") | |
# The ctypes way is not the easy way (nor is it the "compatible | |
# with anything but CPython" way). | |
import ctypes | |
import ctypes.util | |
sqlite = ctypes.CDLL(ctypes.util.find_library('sqlite3')) | |
# https://www.sqlite.org/c3ref/c_config_covering_index_scan.html | |
SQLITE_CONFIG_URI = 17 | |
sqlite.sqlite3_config(SQLITE_CONFIG_URI, 1) | |
conn = sqlite3.connect(file_uri) | |
return conn | |
# Dust our hands off, give the caller what they asked for, pretend we | |
# didn't do anything disreputable just now. | |
return sqlalchemy.engine.create_engine(sqlite_prefix, | |
creator=_sqlite_conn_creator) |
from collections import namedtuple | |
import ctypes | |
import ctypes.util | |
import os | |
import ntpath | |
import posixpath | |
import sqlite3 | |
try: | |
# Python 3 | |
import urllib.parse as urlparse | |
except ImportError: | |
# Python 2 | |
import urlparse | |
import pytest | |
import sqlalchemy.engine | |
from create_sqlite_engine import create_sqlite_engine | |
@pytest.fixture(scope="function") | |
def fxt_create_engine(monkeypatch): | |
monkeypatch.setattr(sqlalchemy.engine, "create_engine", | |
lambda path: path) | |
@pytest.fixture(scope="function") | |
def fxt_create_engine_kwarg(monkeypatch): | |
monkeypatch.setattr(sqlalchemy.engine, "create_engine", | |
lambda path, creator=None: (path, creator)) | |
@pytest.fixture(scope="function", params=[True, False]) | |
def fxt_single_flag(request): | |
return request.param | |
@pytest.fixture(scope="function", params=[True, False]) | |
def fxt_double_flag(fxt_single_flag, request): | |
return fxt_single_flag, request.param | |
@pytest.fixture(scope="function", | |
params=[[('splines', 'reticulated'), ('spam', 'vikings'), | |
('planet', 'zebes'), ('widget', 'frobbed')][:i] | |
for i in range(5)]) | |
def fxt_extra_kwargs(request): | |
return {k: v for k, v in request.param} | |
@pytest.fixture(scope="function", | |
params=[lambda file: file, | |
lambda file, uri: file]) | |
def fxt_sqlite_conn_creator(monkeypatch, request): | |
monkeypatch.setattr(sqlite3, "connect", request.param) | |
monkeypatch.setattr(ctypes, 'CDLL', lambda a: a) | |
ctypes_sqlite = namedtuple('ctypes_sqlite', | |
['sqlite3_config'])(lambda *a: True) | |
find_lib = lambda libname: ctypes_sqlite | |
monkeypatch.setattr(ctypes.util, 'find_library', find_lib) | |
@pytest.mark.parametrize('inpath,outpath', [(':memory:', 'sqlite:///')]) | |
@pytest.mark.usefixtures("fxt_create_engine") | |
def test_sqlite_engine_memory(inpath, outpath, | |
fxt_double_flag, fxt_extra_kwargs): | |
read_only, create_db = fxt_double_flag | |
assert create_sqlite_engine(inpath, | |
read_only=read_only, | |
create_db=create_db, | |
**fxt_extra_kwargs) == outpath | |
@pytest.mark.parametrize( | |
'inpath,outpath', | |
[('/Users/JoanSurname/test.db', 'sqlite:////Users/JoanSurname/test.db'), | |
(r'C:\Users\AlanExtra\test.db', r'sqlite:///C:\Users\AlanExtra\test.db'), | |
('/home/brig/test.db', 'sqlite:////home/brig/test.db')] | |
) | |
@pytest.mark.usefixtures("fxt_create_engine") | |
def test_sqlite_engine_old_sqlite( | |
monkeypatch, inpath, outpath, | |
fxt_double_flag, fxt_extra_kwargs, fxt_sqlite_conn_creator): | |
read_only, create_db = fxt_double_flag | |
monkeypatch.setattr(sqlite3, 'sqlite_version_info', (3, 0, 0)) | |
assert create_sqlite_engine( | |
inpath, | |
read_only=read_only, | |
create_db=create_db, | |
**fxt_extra_kwargs) == outpath | |
@pytest.mark.usefixtures("fxt_sqlite_conn_creator") | |
@pytest.mark.usefixtures("fxt_create_engine_kwarg") | |
@pytest.mark.parametrize( | |
'inpath,outpath,pathlib', [ | |
(r'C:\Users\Gallant\test.db', '/C:/Users/Gallant/test.db', ntpath), | |
('/home/gallant/test.db', '/home/gallant/test.db', posixpath), | |
(r'C:\Users\goofus\poorly#named?.db.db', | |
'/C:/Users/goofus/poorly%23named%3f.db.db', ntpath), | |
('/home/goofus/poorly#named?.db', | |
'/home/goofus/poorly%23named%3f.db', posixpath)]) | |
def test_sqlite_engine_good_paths(monkeypatch, inpath, outpath, pathlib, | |
fxt_double_flag, fxt_extra_kwargs): | |
read_only, create_db = fxt_double_flag | |
monkeypatch.setattr(sqlite3, 'sqlite_version_info', (3, 8, 10)) | |
monkeypatch.setattr(os, 'path', pathlib) | |
prefix, conn_creator = create_sqlite_engine( | |
inpath, | |
create_db=create_db, | |
read_only=read_only, | |
**fxt_extra_kwargs) | |
assert prefix == 'sqlite:///' | |
scheme, _, path, __, query, ___ = urlparse.urlparse(conn_creator()) | |
assert scheme == 'file' | |
assert path == outpath | |
if create_db: | |
fxt_extra_kwargs['mode'] = 'rwc' | |
else: | |
fxt_extra_kwargs['mode'] = 'rw' | |
if read_only: | |
fxt_extra_kwargs['mode'] = 'ro' | |
assert fxt_extra_kwargs == {k: v[0] for k, v in | |
urlparse.parse_qs(query).items()} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment