Skip to content

Instantly share code, notes, and snippets.

@lemon24
Last active February 14, 2024 14:28
Show Gist options
  • Save lemon24/1f35deed9da79dc20d9c538aa0f5e0cc to your computer and use it in GitHub Desktop.
Save lemon24/1f35deed9da79dc20d9c538aa0f5e0cc to your computer and use it in GitHub Desktop.
Python sqlite3 server using multiprocessing.managers

This gist tracks the creation of an almost fully functional sqlite3 server in Python 3, using only the multiprocessing.managers standard library module.

But why?

  • To see if it can be done.
  • To learn about multiprocessing managers.
  • Aside from whole-database locking, SQLite (maybe, see below) supports table-level locking between multiple connections in the same process to the same database. A "SQLite server" makes it possible for users in different processes to use SQLite connections that live in the same process.

A more-than-basic prototype takes less than 100 lines of normally-formatted code. A prototype that can be used as a drop-in replacement in reader takes about 200 lines of code (111 statements, as measured by coverage.py).

See the client_one() and client_two() functions in 08-manager-db-everything.py for all the supported features; the code runs the same with a vanilla sqlite3 connection or a proxy connection.

See 99-output.txt for a performance comparison.

  • The 08-manager-db-everything.py time numbers show the proxy connection / vanilla connection ratio for the same operation. The len(l) just gets the length of a remote or local list, to establish a baseline for the overhead induced by IPC.
  • The 09-reader.py numbers show the same ratio for some reader v1.2 methods; depending on how much work the query does, the ratio ranges from 1.14x to 50x.
    • This comment shows a bench.py diff for get_entries() / update_feeds() / search_entries() / update_search(); the remote/local ratio ranges from 1 to 5x.

Update:

Whether table-level locking actually works as described above is debatable...

  • This 2008 thread implies it does.
  • This 2010 thread implies it does.
  • This 2011 thread summarizes some previous threads, but concludes it doesn't (and it's the same person from the 2008 thread).
  • In this 2015 thread, someone says they actually got it working (but with some other things working differently).
  • This 2016 thread implies it doesn't.

I couldn't get it to work even for normal sqlite3 ("serverless"), see 10-table-level-locking.py for the code.

On an unrelated note, it's possible to achieve table-level locking by using attached databases with one table per database (one, two "Improved concurrency"). That's most likely what I was trying to achieve here. ...But if you use WAL, you lose atomic transactions across multiple tables.

"""
In which we see how multiprocessing listeners and clients work.
https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.connection
"""
import time
import socket
from multiprocessing.connection import Listener, Client
from threading import Thread
def log(*args):
# join before printing to avoid overlapping lines
print(' '.join(map(str, args)) + '\n', end='')
ADDR = ('127.0.0.1', 5000)
# Somehow, the default backlog of 1 transiently raises BrokenPipeError
# on the first send() of one of the clients. Any other value does not.
#
# This only happens on macOS; I was not able to reproduce it on Linux.
# It happens both on the single and the multi-threaded servers.
#
# It may have to do with differences in how connections are queued:
#
# https://stackoverflow.com/questions/19626527/mac-osx-10-9-listen-backlog-works-not-properly
# https://github.com/selfboot/AnnotatedShadowSocks/issues/18
#
# Doing that tcpdump thing in the first link may be an interesting exercise.
BACKLOG = 5
def run_server():
with Listener(ADDR, backlog=BACKLOG) as listener:
while True:
conn = listener.accept()
log('single server got connection', listener.last_accepted)
server_worker(conn)
def multi_run_server():
with Listener(ADDR, backlog=BACKLOG) as listener:
while True:
conn = listener.accept()
log('multi server got connection', listener.last_accepted)
Thread(target=server_worker, args=(conn,)).start()
def server_worker(conn):
with conn:
while True:
try:
rv = conn.recv()
log('server got value', rv)
except EOFError:
log('server got EOFError')
break
conn.send(rv.upper())
NUMBERS = 'one two three four five six seven eight nine ten'.split()
def client(i, n):
with Client(ADDR) as conn:
sock = socket.socket(fileno=conn.fileno())
addr = sock.getsockname()
sock.detach()
log('client', i, 'connected', addr)
for number in NUMBERS[:n]:
try:
msg = f'{i}+{number}'
conn.send(msg)
log('client', i, 'sent', msg)
except OSError as e:
log('client', i, 'got error during send', type(e).__name__, e)
return
try:
log('client', i, 'got', conn.recv())
except OSError as e:
log('client', i, 'got error during recv', type(e).__name__, e)
return
time.sleep(.1)
Thread(target=run_server, daemon=True).start()
time.sleep(.1)
for i, n in enumerate((2, 2, 3, 4)):
Thread(target=client, args=(i, n)).start()
"""
In which we use multiprocessing listeners and clients
to run queries on a single remote SQLite connection.
"""
import time
from multiprocessing.connection import Listener, Client
from threading import Thread
import sqlite3
def log(*args):
# join before printing to avoid overlapping lines
print(' '.join(map(str, args)) + '\n', end='')
ADDR = ('127.0.0.1', 5000)
BACKLOG = 5
def run_server(path):
db = sqlite3.connect(path)
with Listener(ADDR, backlog=BACKLOG) as listener:
while True:
conn = listener.accept()
server_worker(db, conn)
def server_worker(db, conn):
with conn:
while True:
try:
op, *values = conn.recv()
if op == 'execute':
for row in db.execute(*values):
conn.send(('row', row))
conn.send(('end',))
else:
assert False, "unexpected op: %s" % op
except EOFError:
break
class DBConn:
def __init__(self):
self.conn = Client(ADDR)
def execute(self, query, params):
self.conn.send(('execute', query, params))
def rows():
while True:
# TODO: wrap EOFError
op, *values = self.conn.recv()
if op == 'row':
yield values[0]
elif op == 'end':
return
else:
assert False, "unexpected op: %s" % op
return rows()
def __enter__(self):
self.conn.__enter__()
return self
def __exit__(self, *args):
return self.conn.__exit__(*args)
def run_client(i, queries):
with DBConn() as db:
for iq, (query, params) in enumerate(queries):
for row in db.execute(query, params):
log('client', i, 'query', iq, row)
time.sleep(.1)
Thread(target=run_server, args=(':memory:',), daemon=True).start()
time.sleep(.1)
for i, q in enumerate([
[('select 1;', ()), ('select 2, 3;', ())],
[('values (4, 5), (6, 7);', ())],
[('select ?;', (8,)), ('select :value;', {'value': 9})],
]):
Thread(target=run_client, args=(i, q)).start()
"""
In which we use multiprocessing listeners and clients
to run queries on multiple remote SQLite connections to the same database.
"""
import os
import time
from multiprocessing.connection import Listener, Client
from threading import Thread
import sqlite3
def log(*args):
# join before printing to avoid overlapping lines
print(' '.join(map(str, args)) + '\n', end='')
ADDR = ('127.0.0.1', 5000)
BACKLOG = 5
def run_server(path):
with Listener(ADDR, backlog=BACKLOG) as listener:
while True:
conn = listener.accept()
Thread(target=server_worker, args=(path, conn)).start()
def server_worker(path, conn):
db = sqlite3.connect(path)
try:
with conn:
while True:
try:
op, *values = conn.recv()
if op == 'execute':
# transactions should happen with a different keyword
if values[0].startswith('insert'):
db.__enter__()
for row in db.execute(*values):
conn.send(('row', row))
conn.send(('end',))
if values[0].startswith('insert'):
db.__exit__(None,None,None)
else:
assert False, "unexpected op: %s" % op
except EOFError:
break
finally:
db.close
class DBConn:
def __init__(self):
self.conn = Client(ADDR)
def execute(self, query, params):
self.conn.send(('execute', query, params))
def rows():
while True:
# TODO: wrap EOFError
op, *values = self.conn.recv()
if op == 'row':
yield values[0]
elif op == 'end':
return
else:
assert False, "unexpected op: %s" % op
return rows()
def __enter__(self):
self.conn.__enter__()
return self
def __exit__(self, *args):
return self.conn.__exit__(*args)
def run_client(i, queries):
with DBConn() as db:
log('client', i, 'started')
for iq, (query, params) in enumerate(queries):
for row in db.execute(query, params):
log('client', i, 'query', iq, row)
time.sleep(.1)
log('client', i, 'done')
PATH = '/tmp/s3c.sqlite'
os.remove(PATH)
Thread(target=run_server, args=(PATH,), daemon=True).start()
time.sleep(.1)
with DBConn() as db:
# we should not be required to call list() on the result;
# also, we should not require empty params
list(db.execute('create table t(a, b);', ()))
time.sleep(.1)
for i, q in enumerate([
[("insert into t values (?, 'zero');", (0,)), ('select * from t;', ())],
[("insert into t values (1, 'one'), (:i, 'two');", {'i': 2}), ('select * from t;', ())],
[("insert into t values (3, 'three');", ())],
[('select * from t;', ())],
[('select b from t where b like ?;', ('t%',))],
]):
time.sleep(.1)
Thread(target=run_client, args=(i, q)).start()
"""
In which we see how multiprocessing managers work.
https://docs.python.org/3/library/multiprocessing.html#managers
"""
import os
import time
from dataclasses import dataclass
from multiprocessing.managers import BaseManager, BaseProxy
from threading import Thread
import threading
def log(*args):
# join before printing to avoid overlapping lines
print(' '.join(map(str, args)) + '\n', end='')
ADDR = ('127.0.0.1', 5000)
@dataclass
class Dummy:
arg: str
def status(self):
return (
f"hello from {self!r} {id(self)} in process {os.getpid()}, "
f"thread {threading.get_ident()}"
)
def set_arg(self, arg):
self.arg = arg
def make_server_manager():
dummies = dict()
def get_dummy(name):
try:
return dummies[name]
except KeyError:
dummy = Dummy('default')
dummies[name] = dummy
return dummy
Manager = type('Manager', (BaseManager,), {})
Manager.register('get_dummy', get_dummy)
return Manager
def make_client_manager():
Manager = type('Manager', (BaseManager,), {})
Manager.register('get_dummy')
return Manager
def client_one():
manager = make_client_manager()(ADDR)
manager.connect()
dummy = manager.get_dummy('one')
log(f"client one:\n {dummy!r}\n {dummy.status()}\n")
time.sleep(.2)
log(f"client one:\n {dummy!r}\n {dummy.status()}\n")
def client_two():
manager = make_client_manager()(ADDR)
manager.connect()
time.sleep(.1)
dummy = manager.get_dummy('one')
log(f"client two:\n {dummy!r}\n {dummy.status()}\n")
dummy.set_arg('xxx')
manager = make_server_manager()(ADDR)
manager.start()
log(Dummy('manager process').status(), '\n')
Thread(target=client_one).start()
Thread(target=client_two).start()
"""
In which we use multiprocessing managers and proxies
to run queries on multiple remote SQLite connections to the same database.
"""
import os
import time
from multiprocessing.managers import BaseManager, BaseProxy
from threading import Thread
import threading
import sqlite3
def log(*args):
# join before printing to avoid overlapping lines
print(' '.join(map(str, args)) + '\n', end='')
ADDR = ('127.0.0.1', 5000)
PATH = '/tmp/s3c.sqlite'
class Connection(sqlite3.Connection):
def _enter(self):
# unlike __enter__, we do not return the connection (it can't be pickled)
self.__enter__()
class ConnectionProxy(BaseProxy):
_exposed_ = ('execute', '__enter__', '__exit__', '_enter', 'close', 'create_function')
_method_to_typeid_ = {
'execute': 'Cursor',
}
# FIXME: find a way to autogenerate (some) of these
def execute(self, *args):
return self._callmethod('execute', args)
def __enter__(self):
self._callmethod('_enter')
return self
def __exit__(self, *args):
return self._callmethod('__exit__', args)
def close(self, *args):
return self._callmethod('close', args)
def create_function(self, *args):
return self._callmethod('create_function', args)
class CursorProxy(BaseProxy):
_exposed_ = ('__next__',)
def __iter__(self):
return self
# FIXME: find a way to autogenerate these
def __next__(self):
return self._callmethod('__next__')
def make_manager_cls(*args, **kwargs):
def get_db():
return sqlite3.connect(
*args,
factory=Connection,
check_same_thread=False,
**kwargs,
)
Manager = type('Manager', (BaseManager,), {})
Manager.register('Connection', get_db, proxytype=ConnectionProxy)
Manager.register('Cursor', proxytype=CursorProxy, create_method=False)
return Manager
def connect(addr):
manager = make_manager_cls()(addr)
manager.connect()
db = manager.Connection()
return db
def client_one(connect, connect_arg):
log(f"client one: pid {os.getpid()}, ident {threading.get_ident()}")
db = connect(connect_arg)
db.execute("create table if not exists t(a)")
log("client one: created table")
time.sleep(.2)
rows = list(db.execute("select * from t"))
log(f"client one: {rows}")
db.create_function('pymax', 2, max)
log(f"client one: pymax: {list(db.execute('select pymax(1, 2)'))}")
db.close()
try:
db.execute("select 1")
assert False, "expected exception"
except Exception as e:
log(f"client one: select 1: error: {type(e).__name__}: {e}")
def client_two(connect, connect_arg):
log(f"client one: pid {os.getpid()}, ident {threading.get_ident()}")
db = connect(connect_arg)
time.sleep(.1)
with db as _:
cursor = db.execute("insert into t values (1), (2)")
log("client two: inserted")
try:
os.remove(PATH)
except FileNotFoundError:
pass
manager = make_manager_cls(PATH)(ADDR)
with manager:
one = Thread(target=client_one, args=(connect, ADDR))
two = Thread(target=client_two, args=(connect, ADDR))
one.start()
two.start()
one.join()
two.join()
Thread(target=client_one, args=(sqlite3.connect, PATH)).start()
Thread(target=client_two, args=(sqlite3.connect, PATH)).start()
"""
In which we use multiprocessing managers and proxies
to run queries on multiple remote SQLite connections to the same database.
Unlike in 05-manager-db.py, we use MakeProxyType to generate proxy methods.
MakeProxyType is used in multiprocessing.managers, but not documented.
Also, we expose more methods/properties, and re-implement __exit__ client-side.
"""
import os
import time
from multiprocessing.managers import BaseManager, MakeProxyType
from threading import Thread
import threading
import sqlite3
def log(*args):
# join before printing to avoid overlapping lines
print(' '.join(map(str, args)) + '\n', end='')
ADDR = ('127.0.0.1', 5000)
PATH = '/tmp/s3c.sqlite'
class Connection(sqlite3.Connection):
def _enter(self):
self.__enter__()
def _get_in_transaction(self):
return self.in_transaction
def cursor(self):
# we must use our own Cursor, so we don't allow cursor(factory=...)
# (this seems to be DBAPI 2 -compliant, factory= is sqlite3 feature);
# allowing a subclass of our own Cursor may work, but it's not worth the bother
return super().cursor(factory=Cursor)
# execute and executemany obtain their cursor by calling cursor(),
# so we don't need to override them
ConnectionProxyBase = MakeProxyType(
'ConnectionProxyBase',
(
# these need to be exposed even if they're used with callmethod('_enter')
'_enter',
'_get_in_transaction',
'close',
'commit',
'create_function',
'cursor',
'execute',
# TODO: an arbitrary iterator (e.g. generator) won't work with executemany
'executemany',
'rollback',
)
)
class ConnectionProxy(ConnectionProxyBase):
_method_to_typeid_ = {
'cursor': 'Cursor',
'execute': 'Cursor',
'executemany': 'Cursor',
}
def __enter__(self):
# can't _callmethod('__enter__') because it returns the connection,
# which is not picklable
self._enter()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# can't _callmethod('__exit__') because exc_tb is not picklable
if exc_val:
self.rollback()
else:
self.commit()
@property
def in_transaction(self):
return self._callmethod('_get_in_transaction')
class Cursor(sqlite3.Cursor):
def _get_rowcount(self):
return self.rowcount
CursorProxyBase = MakeProxyType(
'CursorProxyBase',
(
'_get_rowcount',
# TODO: __next__ could be more efficient if it would fetch stuff in batches on the client side
'__next__',
'close',
'execute',
'executemany',
'fetchall',
'fetchone',
)
)
class CursorProxy(CursorProxyBase):
# TODO: do we care these return a new proxy to the same cursor instead of the same proxy?
_method_to_typeid_ = {
'execute': 'Cursor',
'executemany': 'Cursor',
}
def __iter__(self):
return self
@property
def rowcount(self):
return self._callmethod('_get_rowcount')
def make_manager_cls(*args, **kwargs):
# We can't make get_db() an argument, because factory must be forced
# to a Connection for proxying to work.
def get_db():
return sqlite3.connect(
*args,
factory=Connection,
check_same_thread=False,
**kwargs,
)
Manager = type('Manager', (BaseManager,), {})
Manager.register('Connection', get_db, proxytype=ConnectionProxy)
Manager.register('Cursor', proxytype=CursorProxy, create_method=False)
return Manager
def connect(addr):
manager = make_manager_cls()(addr)
manager.connect()
db = manager.Connection()
return db
def client_one(connect, connect_arg):
log(f"client one: pid {os.getpid()}, ident {threading.get_ident()}")
db = connect(connect_arg)
db.execute("create table if not exists t(a)")
log("client one: created table")
time.sleep(.2)
cursor = db.execute("select * from t")
log(f"client one: cursor.rowcount, list(cursor): {cursor.rowcount}, {list(cursor)}")
db.create_function('pymax', 2, max)
log(f"client one: pymax: {list(db.execute('select pymax(1, 2)'))}")
db.close()
try:
db.execute("select 1")
assert False, "expected exception"
except Exception as e:
log(f"client one: select 1: error: {type(e).__name__}: {e}")
def client_two(connect, connect_arg):
log(f"client two: pid {os.getpid()}, ident {threading.get_ident()}")
db = connect(connect_arg)
time.sleep(.1)
with db as db_enter:
log(f"client two: db is db.__enter__(): {db is db_enter}")
cursor = db.cursor()
execute_cursor = cursor.execute("insert into t values (1), (2)")
log(f"client two: db.cursor() is cursor.execute(): {cursor is execute_cursor}")
log(f"client two: db.cursor().rowcount, cursor.execute().rowcount: {cursor.rowcount}, {execute_cursor.rowcount}")
try:
log(f"client two: db.in_transaction before with: {db.in_transaction}")
with db:
db.execute("insert into t values (3), (4)")
log(f"client two: db.in_transaction inside with: {db.in_transaction}")
1/0
except ZeroDivisionError:
pass
log("client two: inserted")
try:
os.remove(PATH)
except FileNotFoundError:
pass
manager = make_manager_cls(PATH)(ADDR)
with manager:
one = Thread(target=client_one, args=(connect, ADDR))
two = Thread(target=client_two, args=(connect, ADDR))
one.start()
two.start()
one.join()
two.join()
Thread(target=client_one, args=(sqlite3.connect, PATH)).start()
Thread(target=client_two, args=(sqlite3.connect, PATH)).start()
"""
In which we use multiprocessing managers and proxies
to run queries on multiple remote SQLite connections to the same database.
Instead of using hand-made attribute getters/setters like in
06-manager-db-makeproxytype.py, we expose and use __getattribute__/__setattr__;
this removes the need for custom connection/cursor classes in the server.
Also, we can leave the connection __enter__ be a noop, since it's also a noop
in the original implementation:
https://docs.python.org/3/library/sqlite3.html#sqlite3-controlling-transactions
"""
import os
import time
from multiprocessing.managers import BaseManager, MakeProxyType
from threading import Thread
import threading
import sqlite3
def log(*args):
# join before printing to avoid overlapping lines
print(' '.join(map(str, args)) + '\n', end='')
ADDR = ('127.0.0.1', 5000)
PATH = '/tmp/s3c.sqlite'
ConnectionProxyBase = MakeProxyType(
'ConnectionProxyBase',
(
'close',
'commit',
'create_function',
'cursor',
'execute',
# TODO: an arbitrary iterator (e.g. generator) won't work with executemany
'executemany',
'rollback',
)
)
class ConnectionProxy(ConnectionProxyBase):
# allows us to not implement getter and setter methods on the Connection object
_exposed_ = ConnectionProxyBase._exposed_ + ('__getattribute__', '__setattr__')
_method_to_typeid_ = {
'cursor': 'Cursor',
'execute': 'Cursor',
'executemany': 'Cursor',
}
def __enter__(self):
# __enter__ is a noop in the original implementation
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# can't _callmethod('__exit__') because exc_tb is not picklable
if exc_val:
self.rollback()
else:
self.commit()
@property
def in_transaction(self):
return self._callmethod('__getattribute__', ('in_transaction',))
@property
def isolation_level(self):
return self._callmethod('__getattribute__', ('isolation_level',))
@isolation_level.setter
def isolation_level(self, value):
return self._callmethod('__setattr__', ('isolation_level', value))
CursorProxyBase = MakeProxyType(
'CursorProxyBase',
(
# TODO: __next__ could be more efficient if it would fetch stuff in batches on the client side
'__next__',
'close',
'execute',
'executemany',
'fetchall',
'fetchone',
)
)
class CursorProxy(CursorProxyBase):
_exposed_ = CursorProxyBase._exposed_ + ('__getattribute__',)
# TODO: do we care these return a new proxy to the same cursor instead of the same proxy?
_method_to_typeid_ = {
'execute': 'Cursor',
'executemany': 'Cursor',
}
def __iter__(self):
return self
@property
def rowcount(self):
return self._callmethod('__getattribute__', ('rowcount',))
def make_manager_cls(*args, **kwargs):
def get_db():
return sqlite3.connect(*args, check_same_thread=False, **kwargs)
Manager = type('Manager', (BaseManager,), {})
Manager.register('Connection', get_db, proxytype=ConnectionProxy)
Manager.register('Cursor', proxytype=CursorProxy, create_method=False)
return Manager
def connect(addr):
manager = make_manager_cls()(addr)
manager.connect()
db = manager.Connection()
return db
def client_one(connect, connect_arg):
log(f"client one: pid {os.getpid()}, ident {threading.get_ident()}")
db = connect(connect_arg)
db.execute("create table if not exists t(a)")
log("client one: created table")
time.sleep(.2)
cursor = db.execute("select * from t")
log(f"client one: cursor.rowcount, list(cursor): {cursor.rowcount}, {list(cursor)}")
db.create_function('pymax', 2, max)
log(f"client one: pymax: {list(db.execute('select pymax(1, 2)'))}")
db.close()
try:
db.execute("select 1")
assert False, "expected exception"
except Exception as e:
log(f"client one: select 1: error: {type(e).__name__}: {e}")
def client_two(connect, connect_arg):
log(f"client two: pid {os.getpid()}, ident {threading.get_ident()}")
db = connect(connect_arg)
time.sleep(.1)
with db as db_enter:
log(f"client two: db is db.__enter__(): {db is db_enter}")
cursor = db.cursor()
execute_cursor = cursor.execute("insert into t values (1), (2)")
log(f"client two: db.cursor() is cursor.execute(): {cursor is execute_cursor}")
log(f"client two: db.cursor().rowcount, cursor.execute().rowcount: {cursor.rowcount}, {execute_cursor.rowcount}")
log(f"client two: db.isolation_level: {db.isolation_level}")
db.isolation_level = 'IMMEDIATE'
log(f"client two: db.isolation_level: {db.isolation_level}")
db.isolation_level = ''
log(f"client two: db.isolation_level: {db.isolation_level}")
try:
log(f"client two: db.in_transaction before with: {db.in_transaction}")
with db:
db.execute("insert into t values (3), (4)")
log(f"client two: db.in_transaction inside with: {db.in_transaction}")
1/0
except ZeroDivisionError:
pass
log("client two: inserted")
try:
os.remove(PATH)
except FileNotFoundError:
pass
manager = make_manager_cls(PATH)(ADDR)
with manager:
one = Thread(target=client_one, args=(connect, ADDR))
two = Thread(target=client_two, args=(connect, ADDR))
one.start()
two.start()
one.join()
two.join()
Thread(target=client_one, args=(sqlite3.connect, PATH)).start()
Thread(target=client_two, args=(sqlite3.connect, PATH)).start()
"""
In which we use multiprocessing managers and proxies
to run queries on multiple remote SQLite connections
to the same or different databases.
The additions to 07-manager-db-more-magic.py come from multiple iterations,
but since the file was getting big I merged them in a single version.
* connect() allows connecting to more than one database per server.
* Remote calls when iterating over a cursor are batched (see BatchNextMixin).
* executemany() works with generators (see BatchExecuteManyMixin).
* There are some benchmarks.
Some notes on how the multiprocessing manager works
---------------------------------------------------
The manager has 2 modes of operation, server and client.
For server, it needs to be used within .start()/.shutdown() calls,
or as a context manager, or by calling .serve_forever() (blocking).
For client, .connect() must be called first, which just pings the server
once to see if it's alive, with no connection maintained after that.
Each new proxy object creates a new separate connection; because of this,
there can't be a client-side disconnect()/shotdown() manager method.
The connection closes itself when the proxy gets garbage collected,
which in turn deletes the shared object on the server;
https://docs.python.org/3/library/multiprocessing.html#cleanup
It may be possible to close the proxy's connection explicitly by calling
proxy._close(), but it is undocumented/private;
https://github.com/python/cpython/blob/3.8/Lib/multiprocessing/managers.py#L871
This may be needed if the garbage collection delay is not acceptable
(on PyPy objects are not freed instantly when they are no longer reachable).
Unrelated note: The proxy connections are created lazily on the first
method call, one per thread. Each connection has a corresponding
worker thread in the server.
"""
import os
import sys
import time
from multiprocessing.managers import BaseManager, MakeProxyType
from threading import Thread
import threading
import sqlite3
import collections
import itertools
def log(*args):
# join before printing to avoid overlapping lines
print(' '.join(map(str, args)) + '\n', end='')
ADDR = ('127.0.0.1', 5000)
PATH = '/tmp/s3c.sqlite'
_ConnectionProxyBase = MakeProxyType(
'_ConnectionProxyBase',
(
'close',
'commit',
'create_function',
# TODO: cursor(factory=...) may not work
'cursor',
'execute',
'executemany',
'rollback',
)
)
class ConnectionProxyBase(_ConnectionProxyBase):
# allows us to not implement getter and setter methods on the Connection object
_exposed_ = _ConnectionProxyBase._exposed_ + ('__getattribute__', '__setattr__')
# NOTE: methods that take callables only work with picklable callables
_method_to_typeid_ = {
'cursor': 'Cursor',
'execute': 'Cursor',
'executemany': 'Cursor',
}
def __enter__(self):
# __enter__ is a noop in the original implementation
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# can't _callmethod('__exit__') because exc_tb is not picklable
if exc_val:
self.rollback()
else:
self.commit()
@property
def in_transaction(self):
return self._callmethod('__getattribute__', ('in_transaction',))
@property
def isolation_level(self):
return self._callmethod('__getattribute__', ('isolation_level',))
@isolation_level.setter
def isolation_level(self, value):
return self._callmethod('__setattr__', ('isolation_level', value))
_CursorProxyBase = MakeProxyType(
'_CursorProxyBase',
(
'__next__',
'close',
'execute',
'executemany',
'fetchall',
'fetchmany',
'fetchone',
)
)
class CursorProxyBase(_CursorProxyBase):
_exposed_ = _CursorProxyBase._exposed_ + ('__getattribute__',)
_method_to_typeid_ = {
'execute': 'Cursor',
'executemany': 'Cursor',
}
def __iter__(self):
return self
# If we don't override execute(many), they will return a new proxy
# to the same cursor instead of the same proxy. The new proxy
# may create an additional connection, but that doesn't seem to show up
# in the simple timemarks we run on localhost.
#
# The new proxy may be an issue if the cursor proxy has client-side state
# (see RowCountMixin).
def execute(self, *args, **kwargs):
super().execute(*args, **kwargs)
return self
def executemany(self, *args, **kwargs):
super().executemany(*args, **kwargs)
return self
@property
def rowcount(self):
return self._callmethod('__getattribute__', ('rowcount',))
class BatchNextMixin:
"""__next__() cursor proxy implementation that retrieves rows in batches.
The using class must have a _batch_size defined.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._array = None
self._index = None
def __next__(self):
# makes things 3-4 times faster for 100 noop rows,
# 33 times faster for 10000 noop rows
if self._array is None:
self._array = self.fetchmany(self._batch_size)
self._index = 0
if not self._index < len(self._array):
if len(self._array) < self._batch_size:
raise StopIteration
self._array = self.fetchmany(self._batch_size)
self._index = 0
if not self._array:
raise StopIteration
rv = self._array[self._index]
self._index += 1
return rv
class BatchExecuteManyMixin:
"""executemany() implementation that supports generators.
The using class must have a _batch_size defined.
Compatible with both ConnectionProxy and CursorProxy.
Requires a CursorProxy that has a _rowcount attribute (see RowCountMixin).
"""
def executemany(self, operation, param_sets):
# only pay the chunking price for iterables of unkwown length
# (otherwise for a 10k element param_sets list it takes 3-4x as much time)
if isinstance(param_sets, collections.abc.Sequence):
all_chunks = [param_sets]
else:
all_chunks = chunks(self._batch_size, param_sets)
try:
cursor = self
for i, chunk in enumerate(all_chunks):
chunk = list(chunk)
cursor = super(BatchExecuteManyMixin, cursor).executemany(operation, chunk)
# only pay for the extra rowcount call if we have more than one chunk;
# a custom Connection could send both the cursor and the rowcount in a single roundrip, but this is simpler
if i != 0 or len(chunk) >= self._batch_size:
last_rowcount = super(RowCountMixin, cursor).rowcount
cursor._rowcount = (cursor._rowcount or 0) + last_rowcount
except Exception:
if hasattr(cursor, '_rowcount'):
cursor._rowcount = -1
raise
return cursor
def chunks(n, iterable):
"""chunks(2, 'ABCDE') --> AB CD E"""
# based on https://stackoverflow.com/a/8991553
# copy-pasted from reader._utils
it = iter(iterable)
while True:
chunk = itertools.islice(it, n)
try:
first = next(chunk)
except StopIteration:
break
yield itertools.chain([first], chunk)
class RowCountMixin:
"""Allow overriding .rowcount on a CursorProxy."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._rowcount = None
@property
def rowcount(self):
if self._rowcount is not None:
return self._rowcount
return super().rowcount
def execute(self, *args, **kwargs):
self._rowcount = None
return super().execute(*args, **kwargs)
def executemany(self, *args, **kwargs):
self._rowcount = None
return super().executemany(*args, **kwargs)
class ConnectionProxy(BatchExecuteManyMixin, ConnectionProxyBase):
_batch_size = 256
class CursorProxy(RowCountMixin, BatchExecuteManyMixin, BatchNextMixin, CursorProxyBase):
connection_cls = ConnectionProxy
@property
def _batch_size(self):
return self.connection_cls._batch_size
def make_manager_cls(bases=(BaseManager,)):
def make_connection(*args, **kwargs):
return sqlite3.connect(*args, check_same_thread=False, **kwargs)
Manager = type('Manager', bases, {})
Manager.register('Connection', make_connection, proxytype=ConnectionProxy)
Manager.register('Cursor', proxytype=CursorProxy, create_method=False)
return Manager
def connect(address, authkey, *args, manager_bases=(BaseManager,), **kwargs):
manager = make_manager_cls(bases=manager_bases)(address, authkey)
manager.connect()
return manager.Connection(*args, **kwargs)
def client_one(connect, *args):
log(f"client one: pid {os.getpid()}, ident {threading.get_ident()}")
db = connect(*args)
db.execute("create table if not exists t(a)")
log("client one: created table")
time.sleep(1)
cursor = db.execute("select * from t")
log(f"client one: cursor.rowcount, list(cursor): {cursor.rowcount}, {list(cursor)}")
db.create_function('pymax', 2, max)
log(f"client one: pymax: {list(db.execute('select pymax(1, 2)'))}")
db.close()
try:
db.execute("select 1")
assert False, "expected exception"
except Exception as e:
log(f"client one: select 1: error: {type(e).__name__}: {e}")
def client_two(connect, *args):
log(f"client two: pid {os.getpid()}, ident {threading.get_ident()}")
db = connect(*args)
time.sleep(.1)
with db as db_enter:
log(f"client two: db is db.__enter__(): {db is db_enter}")
cursor = db.cursor()
execute_cursor = cursor.execute("insert into t values (0), (1), (2)")
log(f"client two: db.cursor() is cursor.execute(): {cursor is execute_cursor}")
log(f"client two: db.cursor().rowcount, cursor.execute().rowcount: {cursor.rowcount}, {execute_cursor.rowcount}")
with db:
cursor = db.executemany("insert into t values (?)", [(3,)])
log(f"client two: db.executemany().rowcount #1 {cursor.rowcount}")
cursor = db.executemany("insert into t values (?)", ((i,) for i in (4, 5, 6)))
log(f"client two: db.executemany().rowcount #2 {cursor.rowcount}")
cursor = db.cursor()
cursor.executemany("insert into t values (?)", [(7,), (8,), (9,)])
log(f"client two: db.cursor().executemany().rowcount #1 {cursor.rowcount}")
cursor.executemany("insert into t values (?)", ((i,) for i in (10, 11, 12, 13)))
log(f"client two: db.cursor().executemany().rowcount #2 {cursor.rowcount}")
list(cursor.execute("select 1;"))
log(f"client two: db.cursor().execute().rowcount for select {cursor.rowcount}")
try:
with db:
cursor = db.cursor()
cursor.executemany(
"insert into t values (?)",
[(i,) for i in range(300)]
)
rowcount = cursor.rowcount
cursor.executemany(
"insert into t values (?)",
map(lambda i: (1/i,), range(300, -1, -1))
)
except ZeroDivisionError:
pass
log(f"client two: executemany().rowcount exception: {rowcount}, {cursor.rowcount}")
log(f"client two: db.isolation_level: {db.isolation_level}")
db.isolation_level = 'IMMEDIATE'
log(f"client two: db.isolation_level: {db.isolation_level}")
db.isolation_level = ''
log(f"client two: db.isolation_level: {db.isolation_level}")
try:
log(f"client two: db.in_transaction before with: {db.in_transaction}")
with db:
db.execute("insert into t values (3), (4)")
log(f"client two: db.in_transaction inside with: {db.in_transaction}")
1/0
except ZeroDivisionError:
pass
def clean_up_db():
try:
os.remove(PATH)
except FileNotFoundError:
pass
# --- basic test
def main_test():
ConnectionProxy._batch_size = 2
clean_up_db()
manager = make_manager_cls()(ADDR, b'')
with manager:
one = Thread(target=client_one, args=(connect, ADDR, b'', PATH))
two = Thread(target=client_two, args=(connect, ADDR, b'', PATH))
one.start()
two.start()
one.join()
two.join()
log()
clean_up_db()
one = Thread(target=client_one, args=(sqlite3.connect, PATH))
two = Thread(target=client_two, args=(sqlite3.connect, PATH))
one.start()
two.start()
one.join()
two.join()
log()
# --- benchmark
import timeit
from multiprocessing.managers import SyncManager
# label, bases, stmt, direct_setup, remote_setup
TIME_DATA = [
(
"len(l)",
(SyncManager,),
"len(l)",
"l = list()",
"manager = make_manager_cls(bases)(ADDR, b'')\n"
"manager.connect()\n"
"l = manager.list()\n",
),
(
"select literal",
(BaseManager,),
"list(db.execute('select 1, 2'))",
"db = sqlite3.connect(PATH)",
"db = connect(ADDR, b'', PATH)",
),
(
"select literal reuse cursor",
(BaseManager,),
"list(cursor.execute('select 1, 2'))",
"db = sqlite3.connect(PATH)\ncursor = db.cursor()",
"db = connect(ADDR, b'', PATH)\ncursor = db.cursor()",
),
(
"select 1M blob",
(BaseManager,),
"list(db.execute('select randomblob(1024*1024)'))",
"db = sqlite3.connect(PATH)",
"db = connect(ADDR, b'', PATH)",
),
(
"select from 10k row table",
(BaseManager,),
"list(db.execute('select * from t where a >= 80 and b < 190'))",
"db = sqlite3.connect(PATH)\n"
"db.execute('create table t(a, b)')\n"
"with db:\n"
" db.executemany('insert into t values (?, ?)', [(i, i*2) for i in range(10000)])\n",
"db = connect(ADDR, b'', PATH)\n"
"db.execute('create table t(a, b)')\n"
"with db:\n"
" db.executemany('insert into t values (?, ?)', [(i, i*2) for i in range(10000)])\n",
),
(
"select 100 rows",
(BaseManager,),
"""list(db.execute('''
WITH RECURSIVE
cnt(x) AS (
SELECT 1
UNION ALL
SELECT x+1 FROM cnt
LIMIT 100
)
SELECT x FROM cnt;
'''))""",
"db = sqlite3.connect(PATH)",
"db = connect(ADDR, b'', PATH)",
),
(
"select 10000 rows",
(BaseManager,),
"""list(db.execute('''
WITH RECURSIVE
cnt(x) AS (
SELECT 1
UNION ALL
SELECT x+1 FROM cnt
LIMIT 10000
)
SELECT x FROM cnt;
'''))""",
"db = sqlite3.connect(PATH)",
"db = connect(ADDR, b'', PATH)",
),
(
"select 100 custom func rows",
(BaseManager,),
"""list(db.execute('''
WITH RECURSIVE
cnt(x) AS (
SELECT 1
UNION ALL
SELECT x+1 FROM cnt
LIMIT 100
)
SELECT pow(x, 2) FROM cnt;
'''))""",
"db = sqlite3.connect(PATH)\n"
"db.create_function('pow', 2, pow)\n",
"db = connect(ADDR, b'', PATH)\n"
"db.create_function('pow', 2, pow)\n",
),
(
"simple insert",
(BaseManager,),
"with db:\n"
" db.execute('insert into t values (1, 2)')\n",
"db = sqlite3.connect(PATH)\n"
"db.execute('create table t(a, b)')\n",
"db = connect(ADDR, b'', PATH)\n"
"db.execute('create table t(a, b)')\n",
),
(
"insert 100 rows",
(BaseManager,),
"with db:\n"
" db.executemany('insert into t values (?, ?)', [(i, i*2) for i in range(100)])\n",
"db = sqlite3.connect(PATH)\n"
"db.execute('create table t(a, b)')\n",
"db = connect(ADDR, b'', PATH)\n"
"db.execute('create table t(a, b)')\n",
),
(
"insert 10000 rows",
(BaseManager,),
"with db:\n"
" db.executemany('insert into t values (?, ?)', [(i, i*2) for i in range(10000)])\n",
"db = sqlite3.connect(PATH)\n"
"db.execute('create table t(a, b)')\n",
"db = connect(ADDR, b'', PATH)\n"
"db.execute('create table t(a, b)')\n",
),
]
TIME_ADDRESSES = {
# AF_UNIX seems 50% faster than AF_INET, depending on the query
'AF_UNIX': 'unix-socket-i-guess',
'AF_INET': ('127.0.0.1', 5000),
}
def main_time():
# "Python 3.8 changes the default mode of multiprocessing on MacOS to spawn instead of fork", from
# https://github.com/huge-success/sanic/issues/1774#issuecomment-579182577
#
# TODO: To get this to work with any context other than fork,
# Manager should be importable.
import multiprocessing
fork_context = multiprocessing.get_context('fork')
for family, ADDR in TIME_ADDRESSES.items():
for label, bases, stmt, direct_setup, remote_setup in TIME_DATA:
clean_up_db()
direct = timeit.timeit(stmt, direct_setup, globals=globals(), number=100)
clean_up_db()
with make_manager_cls(bases=bases)(ADDR, b'', ctx=fork_context):
vars = dict(globals())
vars.update(locals())
remote = timeit.timeit(stmt, remote_setup, globals=vars, number=100)
log(f"{family:<7} {label:<39} {remote/direct:>9.2f}")
# --- main
if __name__ == '__main__':
if sys.argv[1] == 'test':
main_test()
if sys.argv[1] == 'time':
main_time()
import sqlite3
import timeit
from reader import make_reader
from reader import _sqlite_utils, _storage
import types
sqliteserver = types.ModuleType('sqliteserver')
exec(open('08-manager-db-everything.py').read(), sqliteserver.__dict__)
def open_sqlite_db(
path,
*,
create,
version,
migrations,
minimum_sqlite_version,
required_sqlite_compile_options=(),
timeout=None
):
address, authkey, path = path
db = sqliteserver.connect(address, authkey, path, detect_types=sqlite3.PARSE_DECLTYPES)
try:
_sqlite_utils.require_sqlite_compile_options(db, required_sqlite_compile_options)
db.execute("PRAGMA foreign_keys = ON;")
migration = _sqlite_utils.HeavyMigration(create, version, migrations)
migration.migrate(db)
return db
except BaseException:
db.close()
raise
stmts = [
('for _ in reader.get_feeds(): pass', 10),
('for _ in reader.get_entries(): pass', 2),
('for _ in reader.search_entries("rocket"): pass', 10),
('reader.is_search_enabled()', 100),
('reader.mark_as_read(next(reader.get_entries(feed="http://www.hellointernet.fm/podcast?format=rss")))', 100),
# These may not do exactly the same thing; for best results,
# call update and update_search() before running the benchmark.
('reader.update_feeds(workers=20)', 1),
('reader.disable_search(); reader.enable_search(); reader.update_search()', 1),
]
with sqliteserver.make_manager_cls()('db.sqlite.sock', b''):
for stmt, number in stmts:
print(stmt)
reader = make_reader('db.sqlite')
local = timeit.timeit(stmt, globals=globals(), number=number)
print('local', local)
reader.close()
original_open_sqlite_db = _storage.open_sqlite_db
_storage.open_sqlite_db = open_sqlite_db
try:
reader = make_reader(('db.sqlite.sock', b'', 'db.sqlite'))
remote = timeit.timeit(stmt, globals=globals(), number=number)
print('remote', remote)
reader.close()
finally:
_storage.open_sqlite_db = original_open_sqlite_db
print('remote/local', remote/local)
print()
"""
In which we see if table-level locking actually works when using
the "sqlite3 server".
If it works, both client_one and client_two should insert successfully
when using the server and normal+Thread (since the threads are in the
same process), but not when using normal+Process.
All 3 scenarios fail with OperationalError.
I think the enabling shared cache does work, since in the server and normal+Thread
scenarios client_one gets "OperationalError: database table is locked",
but in the normal+Process one it gets "OperationalError: database is locked"
(note the missing "table").
Update:
"database table is locked" seems to correspond to SQLITE_LOCKED,
unlike "database is locked" which corresponds to SQLITE_BUSY;
https://github.com/python/cpython/blob/3.8/Modules/_sqlite/connection.c#L1622
Per https://www.sqlite.org/rescode.html#locked,
> [...] whereas SQLITE_LOCKED indicates a conflict within the same database
> connection (or sometimes a database connection with a shared cache).
which isn't necessarily saying something new.
---
See the gist readme for some (inconclusive) research on this:
https://gist.github.com/lemon24/1f35deed9da79dc20d9c538aa0f5e0cc
"""
import sqlite3
import time
from threading import Thread
from multiprocessing import Process
import types
import sys
sqliteserver = types.ModuleType('sqliteserver')
exec(open('08-manager-db-everything.py').read(), sqliteserver.__dict__)
sys.modules['sqliteserver'] = sqliteserver
from sqliteserver import log, ADDR, PATH, clean_up_db, make_manager_cls, connect
def client_one(connect, *args, **kwargs):
db = connect(*args, **kwargs)
with db:
db.execute("create table if not exists t(a)")
db.execute("create table if not exists u(b)")
log("one: created tables")
time.sleep(2)
try:
log("one: begin transaction")
with db:
db.execute("insert into t values (1)")
log("one: end")
except sqlite3.Error as e:
log(f"one: error: {type(e).__name__}: {e}")
time.sleep(2)
log("one:", *db.execute('select * from t union values (null) union select * from u'))
def client_two(connect, *args, **kwargs):
db = connect(*args, **kwargs)
time.sleep(1)
try:
log("two: begin transaction")
with db:
db.execute("insert into u values (2)")
time.sleep(2)
log("two: end")
except sqlite3.Error as e:
log(f"one: error: {type(e).__name__}: {e}")
log("two:", *db.execute('select * from t union values (null) union select * from u'))
PATH = f'file:{PATH}?cache=shared'
kwargs = dict(uri=True, timeout=0.1)
clean_up_db()
manager = make_manager_cls()(ADDR, b'')
with manager:
one = Thread(target=client_one, args=(connect, ADDR, b'', PATH), kwargs=kwargs)
two = Thread(target=client_two, args=(connect, ADDR, b'', PATH), kwargs=kwargs)
one.start()
two.start()
one.join()
two.join()
log()
clean_up_db()
one = Thread(target=client_one, args=(sqlite3.connect, PATH), kwargs=kwargs)
two = Thread(target=client_two, args=(sqlite3.connect, PATH), kwargs=kwargs)
one.start()
two.start()
one.join()
two.join()
log()
clean_up_db()
one = Process(target=client_one, args=(sqlite3.connect, PATH), kwargs=kwargs)
two = Process(target=client_two, args=(sqlite3.connect, PATH), kwargs=kwargs)
one.start()
two.start()
one.join()
two.join()
log()
$ python -u 08-manager-db-everything.py test
client one: pid 42265, ident 123145477623808
client two: pid 42265, ident 123145482878976
client one: created table
client two: db is db.__enter__(): True
client two: db.cursor() is cursor.execute(): True
client two: db.cursor().rowcount, cursor.execute().rowcount: 3, 3
client two: db.executemany().rowcount #1 1
client two: db.executemany().rowcount #2 3
client two: db.cursor().executemany().rowcount #1 3
client two: db.cursor().executemany().rowcount #2 4
client two: db.cursor().execute().rowcount for select -1
client two: executemany().rowcount exception: 300, -1
client two: db.isolation_level:
client two: db.isolation_level: IMMEDIATE
client two: db.isolation_level:
client two: db.in_transaction before with: False
client two: db.in_transaction inside with: True
client one: cursor.rowcount, list(cursor): -1, [(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,), (10,), (11,), (12,), (13,)]
client one: pymax: [(2,)]
client one: select 1: error: ProgrammingError: Cannot operate on a closed database.
client one: pid 42265, ident 123145477623808
client two: pid 42265, ident 123145482878976
client one: created table
client two: db is db.__enter__(): True
client two: db.cursor() is cursor.execute(): True
client two: db.cursor().rowcount, cursor.execute().rowcount: 3, 3
client two: db.executemany().rowcount #1 1
client two: db.executemany().rowcount #2 3
client two: db.cursor().executemany().rowcount #1 3
client two: db.cursor().executemany().rowcount #2 4
client two: db.cursor().execute().rowcount for select -1
client two: executemany().rowcount exception: 300, -1
client two: db.isolation_level:
client two: db.isolation_level: IMMEDIATE
client two: db.isolation_level:
client two: db.in_transaction before with: False
client two: db.in_transaction inside with: True
client one: cursor.rowcount, list(cursor): -1, [(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,), (10,), (11,), (12,), (13,)]
client one: pymax: [(2,)]
client one: select 1: error: ProgrammingError: Cannot operate on a closed database.
$ python -u 08-manager-db-everything.py time
AF_UNIX len(l) 900.08
AF_UNIX select literal 276.11
AF_UNIX select literal reuse cursor 573.59
AF_UNIX select 1M blob 1.55
AF_UNIX select from 10k row table 2.99
AF_UNIX select 100 rows 20.57
AF_UNIX select 10000 rows 2.59
AF_UNIX select 100 custom func rows 11.49
AF_UNIX simple insert 3.40
AF_UNIX insert 100 rows 2.48
AF_UNIX insert 10000 rows 1.43
AF_INET len(l) 1400.28
AF_INET select literal 619.01
AF_INET select literal reuse cursor 1006.34
AF_INET select 1M blob 1.67
AF_INET select from 10k row table 4.29
AF_INET select 100 rows 35.16
AF_INET select 10000 rows 3.62
AF_INET select 100 custom func rows 19.44
AF_INET simple insert 4.47
AF_INET insert 100 rows 4.08
AF_INET insert 10000 rows 1.64
$ python -u 09-reader.py 2>/dev/null
for _ in reader.get_feeds(): pass
local 0.02864729599999999
remote 0.05666920400000003
remote/local 1.9781693881335276
for _ in reader.get_entries(): pass
local 4.877947672
remote 5.669038156
remote/local 1.1621769106997504
for _ in reader.search_entries("rocket"): pass
local 0.16466828899999975
remote 0.18708253099999972
remote/local 1.1361175374816703
reader.is_search_enabled()
local 0.003241604999999481
remote 0.15524679099999972
remote/local 47.8919519805851
reader.mark_as_read(next(reader.get_entries(feed="http://www.hellointernet.fm/podcast?format=rss")))
local 1.2638606570000004
remote 2.0656188429999993
remote/local 1.6343722953629363
reader.update_feeds(workers=20)
local 35.918500501
remote 19.653734900000003
remote/local 0.5471758181957743
reader.disable_search(); reader.enable_search(); reader.update_search()
local 41.24534539599999
remote 42.63562837799999
remote/local 1.0337076333984303
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment