Skip to content

Instantly share code, notes, and snippets.

@brighid
Last active September 19, 2016 22:58
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save brighid/42ed997016973e16bd04df22bfd638f7 to your computer and use it in GitHub Desktop.
A function for passing connection parameters to SQLite in the context of SQLAlchemy. See http://stronglyemergent.com/blog/2016/how-to-make-sqlalchemy-pass-connection-flags-to-sqlite/ for explanation.
import os.path
import re
import sqlite3
from sqlalchemy import create_engine
def create_sqlite_engine(sqlite_path,
read_only=None, create_db=None, **kwargs):
"""
Open SQLite with appropriate use of connection flags.
This is harder than it sounds.
"""
# Requirements: sqlite_path must be either ':memory:' or the output of
# os.path.abspath(). In this context, the SQLitePathType class handles that
# for us.
log.info("Trying to connect to SQLite ({!r})".format(sqlite_path))
sqlite_prefix = 'sqlite:///'
# For an in-memory DB, ignore the flags
if sqlite_path == ':memory:':
log.info("Fallback: :memory: DB.")
return sqlalchemy.engine.create_engine(sqlite_prefix)
# For atrociously old versions of SQLite, ignore the flags
if (sqlite3.sqlite_version_info < (3, 7, 7)):
log.warning(("Found old SQLite version: "
"using it, but have to ignore {!r}").format(kwargs))
return sqlalchemy.engine.create_engine(sqlite_prefix + sqlite_path)
kwargs.pop('mode', None)
if create_db:
kwargs['mode'] = 'rwc'
else:
kwargs['mode'] = 'rw'
# Because the read_only and create_db modes are mutually exclusive, we give
# priority to the more restrictive mode.
if read_only:
kwargs['mode'] = 'ro'
# Normalize sqlite_path from a platform-bound file path to a
# platform-agnostic file:// URI (see https://www.sqlite.org/uri.html for
# details of SQLite's file:// URI requirements).
sqlite_path = re.sub(r'\?', r'%3f', sqlite_path)
sqlite_path = re.sub('#', r'%23', sqlite_path)
drive, sqlite_path = os.path.splitdrive(sqlite_path)
path_segments = []
path_head, path_tail = os.path.split(sqlite_path)
while path_tail != '':
path_segments = [path_tail] + path_segments
path_head, path_tail = os.path.split(path_head)
if drive != '':
path_segments = [drive] + path_segments
try:
# Python 3
query_string = urlparse.urlencode(kwargs)
except AttributeError:
# Python 2
query_string = urllib.urlencode(kwargs)
file_uri = urlparse.urlunparse(
("file", # Scheme
'', # Netloc (localhost)
'/'.join(path_segments), # Path
"", # Params
query_string, # Query
"", # Fragment
))
log.debug("Database URI: {!r}".format(file_uri))
def _sqlite_conn_creator():
try:
log.debug("Trying the easy way...")
# In Python 3.4+, we can do this the easy way.
conn = sqlite3.connect(file_uri, uri=True)
except TypeError:
log.debug("Trying ctypes hackage...")
# The ctypes way is not the easy way (nor is it the "compatible
# with anything but CPython" way).
import ctypes
import ctypes.util
sqlite = ctypes.CDLL(ctypes.util.find_library('sqlite3'))
# https://www.sqlite.org/c3ref/c_config_covering_index_scan.html
SQLITE_CONFIG_URI = 17
sqlite.sqlite3_config(SQLITE_CONFIG_URI, 1)
conn = sqlite3.connect(file_uri)
return conn
# Dust our hands off, give the caller what they asked for, pretend we
# didn't do anything disreputable just now.
return sqlalchemy.engine.create_engine(sqlite_prefix,
creator=_sqlite_conn_creator)
from collections import namedtuple
import ctypes
import ctypes.util
import os
import ntpath
import posixpath
import sqlite3
try:
# Python 3
import urllib.parse as urlparse
except ImportError:
# Python 2
import urlparse
import pytest
import sqlalchemy.engine
from create_sqlite_engine import create_sqlite_engine
@pytest.fixture(scope="function")
def fxt_create_engine(monkeypatch):
monkeypatch.setattr(sqlalchemy.engine, "create_engine",
lambda path: path)
@pytest.fixture(scope="function")
def fxt_create_engine_kwarg(monkeypatch):
monkeypatch.setattr(sqlalchemy.engine, "create_engine",
lambda path, creator=None: (path, creator))
@pytest.fixture(scope="function", params=[True, False])
def fxt_single_flag(request):
return request.param
@pytest.fixture(scope="function", params=[True, False])
def fxt_double_flag(fxt_single_flag, request):
return fxt_single_flag, request.param
@pytest.fixture(scope="function",
params=[[('splines', 'reticulated'), ('spam', 'vikings'),
('planet', 'zebes'), ('widget', 'frobbed')][:i]
for i in range(5)])
def fxt_extra_kwargs(request):
return {k: v for k, v in request.param}
@pytest.fixture(scope="function",
params=[lambda file: file,
lambda file, uri: file])
def fxt_sqlite_conn_creator(monkeypatch, request):
monkeypatch.setattr(sqlite3, "connect", request.param)
monkeypatch.setattr(ctypes, 'CDLL', lambda a: a)
ctypes_sqlite = namedtuple('ctypes_sqlite',
['sqlite3_config'])(lambda *a: True)
find_lib = lambda libname: ctypes_sqlite
monkeypatch.setattr(ctypes.util, 'find_library', find_lib)
@pytest.mark.parametrize('inpath,outpath', [(':memory:', 'sqlite:///')])
@pytest.mark.usefixtures("fxt_create_engine")
def test_sqlite_engine_memory(inpath, outpath,
fxt_double_flag, fxt_extra_kwargs):
read_only, create_db = fxt_double_flag
assert create_sqlite_engine(inpath,
read_only=read_only,
create_db=create_db,
**fxt_extra_kwargs) == outpath
@pytest.mark.parametrize(
'inpath,outpath',
[('/Users/JoanSurname/test.db', 'sqlite:////Users/JoanSurname/test.db'),
(r'C:\Users\AlanExtra\test.db', r'sqlite:///C:\Users\AlanExtra\test.db'),
('/home/brig/test.db', 'sqlite:////home/brig/test.db')]
)
@pytest.mark.usefixtures("fxt_create_engine")
def test_sqlite_engine_old_sqlite(
monkeypatch, inpath, outpath,
fxt_double_flag, fxt_extra_kwargs, fxt_sqlite_conn_creator):
read_only, create_db = fxt_double_flag
monkeypatch.setattr(sqlite3, 'sqlite_version_info', (3, 0, 0))
assert create_sqlite_engine(
inpath,
read_only=read_only,
create_db=create_db,
**fxt_extra_kwargs) == outpath
@pytest.mark.usefixtures("fxt_sqlite_conn_creator")
@pytest.mark.usefixtures("fxt_create_engine_kwarg")
@pytest.mark.parametrize(
'inpath,outpath,pathlib', [
(r'C:\Users\Gallant\test.db', '/C:/Users/Gallant/test.db', ntpath),
('/home/gallant/test.db', '/home/gallant/test.db', posixpath),
(r'C:\Users\goofus\poorly#named?.db.db',
'/C:/Users/goofus/poorly%23named%3f.db.db', ntpath),
('/home/goofus/poorly#named?.db',
'/home/goofus/poorly%23named%3f.db', posixpath)])
def test_sqlite_engine_good_paths(monkeypatch, inpath, outpath, pathlib,
fxt_double_flag, fxt_extra_kwargs):
read_only, create_db = fxt_double_flag
monkeypatch.setattr(sqlite3, 'sqlite_version_info', (3, 8, 10))
monkeypatch.setattr(os, 'path', pathlib)
prefix, conn_creator = create_sqlite_engine(
inpath,
create_db=create_db,
read_only=read_only,
**fxt_extra_kwargs)
assert prefix == 'sqlite:///'
scheme, _, path, __, query, ___ = urlparse.urlparse(conn_creator())
assert scheme == 'file'
assert path == outpath
if create_db:
fxt_extra_kwargs['mode'] = 'rwc'
else:
fxt_extra_kwargs['mode'] = 'rw'
if read_only:
fxt_extra_kwargs['mode'] = 'ro'
assert fxt_extra_kwargs == {k: v[0] for k, v in
urlparse.parse_qs(query).items()}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment