Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
#!/usr/bin/env python3
import sqlite3
import random
import string
from uuid import uuid4
from typing import List
from statistics import mean
def random_str():
str_length = random.randint(40,50)
return ''.join(random.choice(string.ascii_lowercase) for _ in range(str_length))
def sql_init(db_file: str):
conn = sqlite3.connect(db_file)
c = conn.cursor()
configure_sqlite(c)
c.execute('''CREATE TABLE IF NOT EXISTS some_text
(txt1 TEXT, txt2 TEXT, txt3 TEXT, txt4 TEXT, text5 TEXT)''')
return conn
def sql_init_with_constraint(db_file: str):
conn = sqlite3.connect(db_file)
c = conn.cursor()
configure_sqlite(c)
c.execute('''CREATE TABLE IF NOT EXISTS some_text
(txt1 TEXT PRIMARY KEY, txt2 TEXT, txt3 TEXT, txt4 TEXT, text5 TEXT)''')
return conn
def configure_sqlite(cursor: sqlite3.Cursor):
cursor.execute('PRAGMA optimize')
cursor.execute('PRAGMA journal_mode=WAL')
cursor.execute('PRAGMA LOCKING_MODE=EXCLUSIVE')
cursor.execute('PRAGMA synchronous=NORMAL')
cursor.execute('PRAGMA SQLITE_CONFIG_MULTITHREAD')
cursor.execute('PRAGMA SQLITE_DEFAULT_CACHE_SIZE=-4000')
cursor.execute('PRAGMA THREAD=4')
def sql_write(conn: sqlite3.Connection, data: List[List[str]]):
cursor = conn.cursor()
cursor.executemany('''INSERT INTO some_text VALUES
(?,?,?,?,?)''', rows)
conn.commit()
conn.close()
def txt_write(txt_file: str, data: List[List[str]]):
with open(txt_file, 'w') as f:
for row in data:
f.write(', '.join(row) + '\n')
def sql_select(conn: sqlite3.Connection, to_select: List[str]):
cursor = conn.cursor()
query = f'''SELECT * FROM some_text
WHERE txt1 IN ({','.join(['?']*len(to_select))})'''
cursor.execute(query, to_select)
return cursor.fetchall()
def txt_select(txt_file: str, to_select: List[str]):
result = []
with open(txt_file, 'r') as f:
for line in f:
fields = line.split()
if fields[0] in to_select:
result.append(line)
return line
nb_rows = 100000
rows = [[str(uuid4())] + [random_str() for _ in range(4)] for iteration in range(0,nb_rows)]
txt1_field_selector = [rows[random.randint(0,nb_rows)][0] for _ in range(0,100)]
db_file = 'perf_db_txt.db'
db_constraint_file = 'perf_db_txt.db'
txt_file= 'perf_db_txt.txt'
if __name__ == '__main__':
from timeit import repeat
t1 = repeat(stmt='txt_write(txt_file,rows)', repeat=3, number=1, setup='''
from __main__ import txt_write, rows, txt_file
import os
if os.path.exists(txt_file):
os.remove(txt_file)
''')
t2 = repeat(stmt='sql_write(conn,rows)', repeat=3, number=1, setup='''
from __main__ import sql_init, sql_write, rows, db_file
import os
if os.path.exists(db_file):
os.remove(db_file)
conn = sql_init(db_file)
''')
t3 = repeat(stmt='sql_write(conn,rows)', repeat=3, number=1, setup='''
from __main__ import sql_init_with_constraint, sql_write, rows, db_constraint_file
import os
if os.path.exists(db_constraint_file):
os.remove(db_constraint_file)
conn = sql_init_with_constraint(db_constraint_file)
''')
t4 = repeat(stmt='txt_select(txt_file,txt1_field_selector)', repeat=3, number=1, setup='''
from __main__ import txt_select, txt_file, txt1_field_selector
''')
t5 = repeat(stmt='sql_select(conn,txt1_field_selector)', repeat=3, number=1, setup='''
from __main__ import sql_init, sql_select, txt1_field_selector, db_file
conn = sql_init(db_file)
''')
t6 = repeat(stmt='sql_select(conn,txt1_field_selector)', repeat=3, number=1, setup='''
from __main__ import sql_init, sql_select, txt1_field_selector, db_constraint_file
conn = sql_init(db_constraint_file)
''')
print(f'text was writed in {mean(t1):.2f} seconds.')
print(f'database was writed in {mean(t2):.2f} seconds.')
print(f'database with constraint was writed in {mean(t3):.2f} seconds.')
print(f'text selected in {mean(t4):.2f} seconds.')
print(f'database selected in {mean(t5):.2f} seconds.')
print(f'database with constraint selected in {mean(t6):.2f} seconds.')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment