Skip to content

Instantly share code, notes, and snippets.

@mk-fg
Last active December 25, 2023 07:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mk-fg/bcca57313c725c6ccceeacca959ae3bf to your computer and use it in GitHub Desktop.
Save mk-fg/bcca57313c725c6ccceeacca959ae3bf to your computer and use it in GitHub Desktop.
Simple benchmark for different SQLite python stdlib module options
#!/usr/bin/env python
import contextlib as cl, collections as cs, multiprocessing as mp
import os, sys, random, time, secrets, pathlib as pl
class TestDB:
_db, _db_schema = None, ['''
create table if not exists data (
id integer not null primary key autoincrement,
datapoint integer not null, some_text text not null );
create unique index if not exists data_points on data (datapoint);''']
def __init__(self, opts):
import sqlite3
self._sqlite, conn_kws = sqlite3, dict( (k, opts[k]) for k in
'autocommit isolation_level timeout'.split() if opts.get(k) is not None )
self._db = self._sqlite.connect(database=opts.path, **conn_kws )
with self() as c:
if opts.journal_mode: c.execute(f'pragma journal_mode={opts.journal_mode}')
c.execute('pragma user_version')
if (sv := c.fetchall()[0][0]) == (sv_chk := len(self._db_schema)): return
elif sv > sv_chk:
raise RuntimeError('DB schema [{sv}] newer than the script [{sv_chk}]')
for sv, sql in enumerate(self._db_schema[sv:], sv+1):
for st in sql.split(';'): c.execute(st)
c.execute(f'pragma user_version = {sv}')
def close(self):
if not self._db: return
self._db = self._db.close()
def __enter__(self): return self
def __exit__(self, *err): self.close()
@cl.contextmanager
def __call__(self):
with self._db as conn, cl.closing(conn.cursor()) as c: yield c
def test_reads(self, stats):
with self() as c:
c.execute('select id from data order by id desc limit 1')
if not ((rows := c.fetchall()) and (id_max := rows[0][0])): return
stats['id_max'] = id_max
for n in range(id_max): # query roughly all IDs, in random order
n = random.randint(0, id_max)
c.execute('select datapoint, some_text from data where id = ?', (n,))
stats['select'] += 1
if not (rows := c.fetchall()): stats['select_miss'] += 1
return stats
def test_writes(self, stats, update_chance=0.2):
with self() as c:
data_n = random.randint(0, 2**32 - 1)
data_text = secrets.token_urlsafe( random.randint(3, 30)
if random.random() > 0.2 else random.randint(30, 100) )
if random.random() < update_chance:
c.execute('select id from data order by id desc limit 1')
if not ((rows := c.fetchall()) and (id_max := rows[0][0])): return
stats['update'] += 1; stats['id_max'] = id_max
try:
c.execute( 'update data set datapoint = ?, some_text = ?'
' where id = ? ', (data_n, data_text, random.randint(0, id_max)) )
if not c.rowcount: stats['update_missing'] += 1
except self._sqlite.IntegrityError: stats['collision'] += 1
else:
stats['insert'] += 1
try: c.execute( 'insert into data (datapoint,'
' some_text) values (?, ?)', (data_n, data_text) )
except self._sqlite.IntegrityError: stats['collision'] += 1
stats['id_max'] = c.lastrowid
return stats
class adict(dict):
def __init__(self, *args, **kws):
super().__init__(*args, **kws)
self.__dict__ = self
def test_reader(opts, pipe):
random.seed()
with TestDB(opts.db) as db:
stats, ts0 = cs.Counter(), time.monotonic()
while time.monotonic() < opts.ts_max:
stats['runs'] += 1
db.test_reads(stats)
if opts.r_loops and stats['runs'] >= opts.r_loops: break
stats = dict( stats, type='reader',
pid=str(os.getpid()), runtime=time.monotonic() - ts0 )
pipe.send(stats)
def test_writer(opts, pipe):
random.seed()
with TestDB(opts.db) as db:
stats, ts0 = cs.Counter(), time.monotonic()
while time.monotonic() < opts.ts_max:
stats['runs'] += 1
db.test_writes(stats)
stats = dict( stats, type='writer',
pid=str(os.getpid()), runtime=time.monotonic() - ts0 )
pipe.send(stats)
def main(argv=None):
import argparse, re, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
usage='%(prog)s [opts] db.sqlite',
formatter_class=argparse.RawTextHelpFormatter, description=dd('''
Tool to test various sqlite db options and its python wrapper pathologies.
Creates db file at specified argument path and runs requested tests.'''))
parser.add_argument('db_file', help=dd('''
SQLite database file to create/use. Will be reused as-is, if exists already.'''))
parser.add_argument('-c', '--create-db', action='store_true', help=dd('''
Create fresh db file, removing the old one, if necessary.'''))
parser.add_argument('-r', '--rm-db', action='store_true', help=dd('''
Remove specified sqlite db file after test finishes
successfully, i.e. unless interrupted by Ctrl-C or whatever errors.'''))
group = parser.add_argument_group('Test-run limits/parameters')
group.add_argument('-t', '--test-duration', type=float, metavar='sec', default=10, help=dd('''
Stop running the test roughly after specified number
of seconds, rounded up to test-loop durations. Default: %(default)ss'''))
group.add_argument('-n', '--reader-procs', type=int, metavar='n', default=0, help=dd('''
Run looped queries from specified number of "reader" processes,
setup same as writers, just not running any actual insert/update/delete ops.'''))
group.add_argument('-l', '--reader-loops', type=int, metavar='n', help=dd('''
Run N db-read-loops in every started reader process, instead of using time limit.'''))
group.add_argument('-m', '--writer-procs', type=int, metavar='n', default=0, help=dd('''
Run looped insert/update queries from specified number of "writer" processes.'''))
group = parser.add_argument_group(
'SQLite db/access opts (single-letter ones are all capitalized)' )
group.add_argument('-I', '--sqlite-isolation-level',
metavar='level', default='deferred', help=dd('''
isolation_level option to use for all sqlite.connect() operations.
One of (case-insensitive): deferred, exclusive, immediate or none. Default: %(default)s'''))
group.add_argument('-J', '--sqlite-journal-mode', metavar='mode', help=dd('''
Use "pragma journal_mode=..." after opening sqlite database.
Not set by default (not the same as "-W off"!), meaning that previously-set
value will be used, or whatever is sqlite default when creating a new database file.
Can be set to one of (case-insensitive): delete, truncate, persist, memory, wal, off.
See https://www.sqlite.org/pragma.html#pragma_journal_mode for details.'''))
group.add_argument('-L', '--sqlite-lock-timeout',
type=float, metavar='sec', default=60, help=dd('''
lock_timeout option to use for sqlite.connect(). Default: %(default)ss'''))
group.add_argument('-A', '--sqlite-autocommit', metavar='on/off', help=dd('''
Explicitly set python sqlite module "autocommit" behavior.
Can be one of: on, off. Default is to not set the value, i.e. use module default.'''))
opts = parser.parse_args(argv)
pn, pm = opts.reader_procs, opts.writer_procs
if not (pn > 0 or pm > 0): parser.error('No number for readers/writers to test specified')
test_opts = adict(
db=adict(
path=pl.Path(opts.db_file), autocommit=opts.sqlite_autocommit,
isolation_level=opts.sqlite_isolation_level.upper(),
journal_mode=(jm := opts.sqlite_journal_mode) and jm.upper(),
timeout=opts.sqlite_lock_timeout ),
ts_max=0, r_loops=opts.reader_loops )
if test_opts.db.isolation_level == 'NONE': test_opts.db.isolation_level = None
if v := test_opts.db.autocommit: test_opts.db.autocommit = bool(['off', 'on'].index(v))
if opts.create_db: test_opts.db.path.unlink(missing_ok=True)
with TestDB(test_opts.db) as db: pass # check access/options
pipe, pipe_send = mp.Pipe()
procs, totals, ts0 = list(), cs.Counter(), time.monotonic()
test_opts.ts_max = time.monotonic() + opts.test_duration
for (func, name_tpl, n_proc) in [
(test_writer, 'writer.{}', pm), (test_reader, 'reader.{}', pn) ]:
for n in range(n_proc):
procs.append(p := mp.Process(
target=func, args=(test_opts, pipe_send),
name=name_tpl.format(n), daemon=True ))
p.start()
for n, p in enumerate(procs):
p.join()
if not pipe.poll(): raise RuntimeError('Process exited without sending result')
procs[n] = pipe.recv()
totals['runtime'] = time.monotonic() - ts0
def fmt_vals(d):
for k, v in d.items():
if isinstance(v, int): yield f'{k}={v:,d}'
elif isinstance(v, float): yield f'{k}={v:,.2f}'.removesuffix('.00')
else: yield f'{k}={v}'
for stats in procs:
st_pid, st_t = (stats.pop(k) for k in 'pid type'.split())
totals.update(dict((k, stats.get(k, 0)) for k in 'select insert update'.split()))
totals[f'mean_runtime_{st_t}'] += stats['runtime']; totals[f'n_{st_t}'] += 1
print(f'- pid {st_pid} {st_t} ::', ' '.join(fmt_vals(stats)))
for st_t in 'reader writer'.split():
totals[f'mean_runtime_{st_t}'] = totals[f'mean_runtime_{st_t}'] / totals[f'n_{st_t}']
print('Totals:', ' '.join(fmt_vals(totals)))
if opts.rm_db: test_opts.db.path.unlink(missing_ok=True)
if __name__ == '__main__': sys.exit(main())
% ./sqlite-test.py -c /mnt/old-25in-hdd/test.sqlite -t 3 -n 6 -m 6 -J wal
- pid 88791 reader :: runs=66,820 id_max=42 select=581,628 runtime=2.66
- pid 88790 reader :: runs=68,140 id_max=42 select=588,633 runtime=2.66
- pid 88787 reader :: runs=69,187 id_max=42 select=591,769 runtime=2.66
- pid 88788 reader :: runs=68,129 id_max=42 select=587,302 runtime=2.66
- pid 88789 reader :: runs=68,280 id_max=42 select=590,269 runtime=2.66
- pid 88792 reader :: runs=68,109 id_max=42 select=589,858 runtime=2.66
- pid 88781 writer :: runs=56 insert=43 id_max=43 update=13 update_missing=2 runtime=2.69
- pid 88782 writer :: runs=1 insert=1 id_max=44 runtime=2.78
- pid 88785 writer :: runs=2 insert=1 id_max=45 runtime=2.88
- pid 88784 writer :: runs=2 insert=1 id_max=46 runtime=2.98
- pid 88783 writer :: runs=1 insert=1 id_max=47 runtime=3.09
- pid 88786 writer :: runs=1 insert=1 id_max=48 runtime=3.21
Totals: runtime=3.25 select=3,529,459 insert=48 update=13 median_runtime_reader=2.66 n_reader=6 median_runtime_writer=2.94 n_writer=6
% ./sqlite-test.py -h
usage: sqlite-test.py [opts] db.sqlite
Tool to test for various sqlite db and its python wrapper pathologies.
Creates db file at specified argument path and runs requested tests.
positional arguments:
db_file SQLite database file to create/use. Will be reused as-is, if exists already.
options:
-h, --help show this help message and exit
-c, --create-db Create fresh db file, removing the old one, if necessary.
Test-run limits/parameters:
-t sec, --test-duration sec
Stop running the test roughly after specified number
of seconds, rounded up to test-loop durations. Default: 10s
-n n, --reader-procs n
Run looped queries from specified number of "reader" processes,
setup same as writers, just not running any actual insert/update/delete ops.
-l n, --reader-loops n
Run N db-read-loops in every started reader process, instead of using time limit.
-m n, --writer-procs n
Run looped insert/update queries from specified number of "writer" processes.
SQLite db options (single-letter ones are all capitalized):
-I level, --sqlite-isolation-level level
isolation_level option to use for all sqlite.connect() operations.
One of (case-insensitive): deferred, exclusive, immediate or none. Default: deferred
-J mode, --sqlite-journal-mode mode
Use "pragma journal_mode=..." after opening sqlite database.
Not set by default (not the same as "-W off"!), meaning that previously-set
value will be used, or whatever is sqlite default when creating a new database file.
Can be set to one of (case-insensitive): delete, truncate, persist, memory, wal, off.
See https://www.sqlite.org/pragma.html#pragma_journal_mode for details.
-L sec, --sqlite-lock-timeout sec
lock_timeout option to use for sqlite.connect(). Default: 60s
-A on/off, --sqlite-autocommit on/off
Explicitly set python sqlite module "autocommit" behavior.
Can be one of: on, off. Default is to not set the value, i.e. use module default.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment