Last active
March 18, 2018 03:16
-
-
Save mjkramer/20ad02c4540b27f236969e38c888877b to your computer and use it in GitHub Desktop.
a teeny tiny ultra-minimal "ORM" for sqlite
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python2.7 | |
import sqlite3 | |
class DBWriter(object): | |
class Model(object): | |
pass | |
class ModelBox(object): | |
pass | |
def __init__(self, dbname, initsql=None, batchsize=10): | |
self.con = sqlite3.connect(dbname) | |
self.models = DBWriter.ModelBox() | |
self._batchsize = batchsize | |
self._queue = {} | |
if initsql: | |
self.con.executescript(initsql) | |
for row in self.con.execute('select * from sqlite_master'): | |
table, model = row[1], DBWriter.Model() | |
setattr(self.models, table, model) | |
for col_info in self.con.execute('pragma table_info(%s)' % table): | |
name = col_info[1] | |
setattr(model, name, None) | |
self._queue[table] = [] | |
def write(self, table=None): | |
tables = [table] if table else self.models.__dict__.keys() | |
for t in tables: | |
row = getattr(self.models, t).__dict__.values() | |
self._queue[t].append(row) | |
self._maybe_flush(t, self._batchsize) | |
def flush(self): | |
for table in self.models.__dict__: | |
self._maybe_flush(table, 1) | |
def _maybe_flush(self, table, minrows): | |
if len(self._queue[table]) >= minrows: | |
self._flush(table) | |
def _flush(self, table): | |
cols = ', '.join(getattr(self.models, table).__dict__.keys()) | |
qmarks = ', '.join('?' * len(self._queue[table][0])) | |
stmt = 'INSERT INTO %s (%s) VALUES (%s)' % (table, cols, qmarks) | |
self.con.executemany(stmt, self._queue[table]) | |
self.con.commit() | |
self._queue[table] = [] | |
def __enter__(self): | |
return self | |
def __exit__(self, _exc_type, _exc_value, _traceback): | |
self.flush() | |
def test(): | |
initsql = 'create table t1 (a, b, c); create table t2 (d, e, f)' | |
w = DBWriter(':memory:', initsql) | |
w.models.t1.a = 1 | |
w.models.t1.b = 2 | |
w.models.t1.c = 3 | |
w.models.t2.d = 4 | |
w.models.t2.e = 5 | |
w.models.t2.f = 6 | |
w.write() | |
w.flush() | |
print(w.con.execute('select * from t1').fetchall()) | |
print(w.con.execute('select * from t2').fetchall()) | |
with DBWriter('test.sqlite', initsql) as w: | |
for i in range(15): | |
w.models.t1.a = 1 * i | |
w.models.t1.b = 2 * i | |
w.models.t1.c = 3 * i | |
w.write('t1') | |
w.models.t2.d = 4 * i | |
w.models.t2.e = 5 * i | |
w.models.t2.f = 6 * i | |
w.write('t2') | |
db = sqlite3.connect('test.sqlite') | |
print(db.execute('select * from t1').fetchall()) | |
print(db.execute('select * from t2').fetchall()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment