Last active
September 19, 2016 22:58
-
-
Save brighid/42ed997016973e16bd04df22bfd638f7 to your computer and use it in GitHub Desktop.
A function for passing connection parameters to SQLite in the context of SQLAlchemy. See http://stronglyemergent.com/blog/2016/how-to-make-sqlalchemy-pass-connection-flags-to-sqlite/ for explanation.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os.path | |
import re | |
import sqlite3 | |
from sqlalchemy import create_engine | |
def create_sqlite_engine(sqlite_path, | |
read_only=None, create_db=None, **kwargs): | |
""" | |
Open SQLite with appropriate use of connection flags. | |
This is harder than it sounds. | |
""" | |
# Requirements: sqlite_path must be either ':memory:' or the output of | |
# os.path.abspath(). In this context, the SQLitePathType class handles that | |
# for us. | |
log.info("Trying to connect to SQLite ({!r})".format(sqlite_path)) | |
sqlite_prefix = 'sqlite:///' | |
# For an in-memory DB, ignore the flags | |
if sqlite_path == ':memory:': | |
log.info("Fallback: :memory: DB.") | |
return sqlalchemy.engine.create_engine(sqlite_prefix) | |
# For atrociously old versions of SQLite, ignore the flags | |
if (sqlite3.sqlite_version_info < (3, 7, 7)): | |
log.warning(("Found old SQLite version: " | |
"using it, but have to ignore {!r}").format(kwargs)) | |
return sqlalchemy.engine.create_engine(sqlite_prefix + sqlite_path) | |
kwargs.pop('mode', None) | |
if create_db: | |
kwargs['mode'] = 'rwc' | |
else: | |
kwargs['mode'] = 'rw' | |
# Because the read_only and create_db modes are mutually exclusive, we give | |
# priority to the more restrictive mode. | |
if read_only: | |
kwargs['mode'] = 'ro' | |
# Normalize sqlite_path from a platform-bound file path to a | |
# platform-agnostic file:// URI (see https://www.sqlite.org/uri.html for | |
# details of SQLite's file:// URI requirements). | |
sqlite_path = re.sub(r'\?', r'%3f', sqlite_path) | |
sqlite_path = re.sub('#', r'%23', sqlite_path) | |
drive, sqlite_path = os.path.splitdrive(sqlite_path) | |
path_segments = [] | |
path_head, path_tail = os.path.split(sqlite_path) | |
while path_tail != '': | |
path_segments = [path_tail] + path_segments | |
path_head, path_tail = os.path.split(path_head) | |
if drive != '': | |
path_segments = [drive] + path_segments | |
try: | |
# Python 3 | |
query_string = urlparse.urlencode(kwargs) | |
except AttributeError: | |
# Python 2 | |
query_string = urllib.urlencode(kwargs) | |
file_uri = urlparse.urlunparse( | |
("file", # Scheme | |
'', # Netloc (localhost) | |
'/'.join(path_segments), # Path | |
"", # Params | |
query_string, # Query | |
"", # Fragment | |
)) | |
log.debug("Database URI: {!r}".format(file_uri)) | |
def _sqlite_conn_creator(): | |
try: | |
log.debug("Trying the easy way...") | |
# In Python 3.4+, we can do this the easy way. | |
conn = sqlite3.connect(file_uri, uri=True) | |
except TypeError: | |
log.debug("Trying ctypes hackage...") | |
# The ctypes way is not the easy way (nor is it the "compatible | |
# with anything but CPython" way). | |
import ctypes | |
import ctypes.util | |
sqlite = ctypes.CDLL(ctypes.util.find_library('sqlite3')) | |
# https://www.sqlite.org/c3ref/c_config_covering_index_scan.html | |
SQLITE_CONFIG_URI = 17 | |
sqlite.sqlite3_config(SQLITE_CONFIG_URI, 1) | |
conn = sqlite3.connect(file_uri) | |
return conn | |
# Dust our hands off, give the caller what they asked for, pretend we | |
# didn't do anything disreputable just now. | |
return sqlalchemy.engine.create_engine(sqlite_prefix, | |
creator=_sqlite_conn_creator) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import namedtuple | |
import ctypes | |
import ctypes.util | |
import os | |
import ntpath | |
import posixpath | |
import sqlite3 | |
try: | |
# Python 3 | |
import urllib.parse as urlparse | |
except ImportError: | |
# Python 2 | |
import urlparse | |
import pytest | |
import sqlalchemy.engine | |
from create_sqlite_engine import create_sqlite_engine | |
@pytest.fixture(scope="function") | |
def fxt_create_engine(monkeypatch): | |
monkeypatch.setattr(sqlalchemy.engine, "create_engine", | |
lambda path: path) | |
@pytest.fixture(scope="function") | |
def fxt_create_engine_kwarg(monkeypatch): | |
monkeypatch.setattr(sqlalchemy.engine, "create_engine", | |
lambda path, creator=None: (path, creator)) | |
@pytest.fixture(scope="function", params=[True, False]) | |
def fxt_single_flag(request): | |
return request.param | |
@pytest.fixture(scope="function", params=[True, False]) | |
def fxt_double_flag(fxt_single_flag, request): | |
return fxt_single_flag, request.param | |
@pytest.fixture(scope="function", | |
params=[[('splines', 'reticulated'), ('spam', 'vikings'), | |
('planet', 'zebes'), ('widget', 'frobbed')][:i] | |
for i in range(5)]) | |
def fxt_extra_kwargs(request): | |
return {k: v for k, v in request.param} | |
@pytest.fixture(scope="function", | |
params=[lambda file: file, | |
lambda file, uri: file]) | |
def fxt_sqlite_conn_creator(monkeypatch, request): | |
monkeypatch.setattr(sqlite3, "connect", request.param) | |
monkeypatch.setattr(ctypes, 'CDLL', lambda a: a) | |
ctypes_sqlite = namedtuple('ctypes_sqlite', | |
['sqlite3_config'])(lambda *a: True) | |
find_lib = lambda libname: ctypes_sqlite | |
monkeypatch.setattr(ctypes.util, 'find_library', find_lib) | |
@pytest.mark.parametrize('inpath,outpath', [(':memory:', 'sqlite:///')]) | |
@pytest.mark.usefixtures("fxt_create_engine") | |
def test_sqlite_engine_memory(inpath, outpath, | |
fxt_double_flag, fxt_extra_kwargs): | |
read_only, create_db = fxt_double_flag | |
assert create_sqlite_engine(inpath, | |
read_only=read_only, | |
create_db=create_db, | |
**fxt_extra_kwargs) == outpath | |
@pytest.mark.parametrize( | |
'inpath,outpath', | |
[('/Users/JoanSurname/test.db', 'sqlite:////Users/JoanSurname/test.db'), | |
(r'C:\Users\AlanExtra\test.db', r'sqlite:///C:\Users\AlanExtra\test.db'), | |
('/home/brig/test.db', 'sqlite:////home/brig/test.db')] | |
) | |
@pytest.mark.usefixtures("fxt_create_engine") | |
def test_sqlite_engine_old_sqlite( | |
monkeypatch, inpath, outpath, | |
fxt_double_flag, fxt_extra_kwargs, fxt_sqlite_conn_creator): | |
read_only, create_db = fxt_double_flag | |
monkeypatch.setattr(sqlite3, 'sqlite_version_info', (3, 0, 0)) | |
assert create_sqlite_engine( | |
inpath, | |
read_only=read_only, | |
create_db=create_db, | |
**fxt_extra_kwargs) == outpath | |
@pytest.mark.usefixtures("fxt_sqlite_conn_creator") | |
@pytest.mark.usefixtures("fxt_create_engine_kwarg") | |
@pytest.mark.parametrize( | |
'inpath,outpath,pathlib', [ | |
(r'C:\Users\Gallant\test.db', '/C:/Users/Gallant/test.db', ntpath), | |
('/home/gallant/test.db', '/home/gallant/test.db', posixpath), | |
(r'C:\Users\goofus\poorly#named?.db.db', | |
'/C:/Users/goofus/poorly%23named%3f.db.db', ntpath), | |
('/home/goofus/poorly#named?.db', | |
'/home/goofus/poorly%23named%3f.db', posixpath)]) | |
def test_sqlite_engine_good_paths(monkeypatch, inpath, outpath, pathlib, | |
fxt_double_flag, fxt_extra_kwargs): | |
read_only, create_db = fxt_double_flag | |
monkeypatch.setattr(sqlite3, 'sqlite_version_info', (3, 8, 10)) | |
monkeypatch.setattr(os, 'path', pathlib) | |
prefix, conn_creator = create_sqlite_engine( | |
inpath, | |
create_db=create_db, | |
read_only=read_only, | |
**fxt_extra_kwargs) | |
assert prefix == 'sqlite:///' | |
scheme, _, path, __, query, ___ = urlparse.urlparse(conn_creator()) | |
assert scheme == 'file' | |
assert path == outpath | |
if create_db: | |
fxt_extra_kwargs['mode'] = 'rwc' | |
else: | |
fxt_extra_kwargs['mode'] = 'rw' | |
if read_only: | |
fxt_extra_kwargs['mode'] = 'ro' | |
assert fxt_extra_kwargs == {k: v[0] for k, v in | |
urlparse.parse_qs(query).items()} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment