Skip to content

Instantly share code, notes, and snippets.

@georgexsh
Created January 17, 2022 06:06
Show Gist options
  • Save georgexsh/2291d0719d28b44a054642098fbc79fc to your computer and use it in GitHub Desktop.
Save georgexsh/2291d0719d28b44a054642098fbc79fc to your computer and use it in GitHub Desktop.
sqlite_kv_concurrent.py
import time
import sqlite3
import os
import random
import multiprocessing
class Store1:
"""sharding to tables"""
def __init__(self, filename="kv.db", buckets=10):
self.buckets = buckets
self.conn = sqlite3.connect(filename, timeout=60)
self.conn.execute("pragma journal_mode=wal")
for n in range(buckets):
self.conn.execute(
f'create table if not exists "kv_{n}" (key integer primary key, value integer) without rowid'
)
self.conn.commit()
def _get_table(self, key):
assert isinstance(key, int)
return f"kv_{key % self.buckets}"
def get(self, key):
item = self.conn.execute(
f'select value from "{self._get_table(key)}" where key=?', (key,)
)
if item:
return next(item)[0]
def set(self, key, value):
self.conn.execute(
f'replace into "{self._get_table(key)}" (key, value) values (?,?)',
(key, value),
)
self.conn.commit()
class Store0:
"""no sharding"""
def __init__(self, filename="kv.db"):
self.conn = sqlite3.connect(filename, timeout=60)
self.conn.execute("pragma journal_mode=wal")
self.conn.execute(
'create table if not exists "kv" (key integer primary key, value integer) without rowid'
)
self.conn.commit()
def get(self, key):
item = self.conn.execute('select value from "kv" where key=?', (key,))
if item:
return next(item)[0]
def set(self, key, value):
self.conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
self.conn.commit()
class Store:
"""sharding to files"""
def __init__(self, buckets=5):
self.buckets = buckets
self.conns = []
for n in range(buckets):
conn = sqlite3.connect(f"kv_{n}.db", timeout=60)
conn.execute("pragma journal_mode=wal")
conn.execute(
'create table if not exists "kv" (key integer primary key, value integer) without rowid'
)
conn.commit()
self.conns.append(conn)
def _get_conn(self, key):
assert isinstance(key, int)
return self.conns[key % self.buckets]
def get(self, key):
item = self._get_conn(key).execute('select value from "kv" where key=?', (key,))
if item:
return item.fetchall()
# return next(item)[0]
def set(self, key, value):
conn = self._get_conn(key)
conn.execute('replace into "kv" (key, value) values (?,?)', (key, value))
conn.commit()
n = 10000
d = [random.randint(0, 1 << 20) for _ in range(n)]
print("global", os.getpid(), sum(d))
random.shuffle(d)
def init(cls):
s = cls()
for i in d:
s.set(i, i)
def worker(cls, n=None):
s = cls()
# print(os.getpid(), sum(d))
for i in d:
s.get(i)
# print(r)
def test(cls, c):
worker(cls) # warm up
start = time.time()
ps = []
for _ in range(c):
p = multiprocessing.Process(target=worker, args=(cls,))
p.start()
ps.append(p)
while any(p.is_alive() for p in ps):
time.sleep(0.01)
cost = time.time() - start
print(f"{c:<10d}\t{cost:<7.2f}\t{n/cost:<20.2f}\t{n*c/cost:<14.2f}")
def main():
multiprocessing.set_start_method("fork")
cls = Store
# cls = Store1
init(cls)
print("concurrency\ttime(s)\tpre process TPS(r/s)\ttotal TPS(r/s)")
for c in range(1, 21):
test(cls, c)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment