Skip to content

Instantly share code, notes, and snippets.

@mchaput
Created November 17, 2015 23:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mchaput/ecbcec8e70b8f88150a5 to your computer and use it in GitHub Desktop.
Save mchaput/ecbcec8e70b8f88150a5 to your computer and use it in GitHub Desktop.
import os.path
import sqlite3 as sqlite
from sqlite3 import Binary
from whoosh.kv.db import Database, DBReader, DBWriter, Cursor
from whoosh.kv.db import OverrunError, ReadOnlyError
class Sqlite(Database):
default_table = "kv"
drop = True
def __init__(self, path, name="main"):
self.path = path
self.name = name
self.filepath = os.path.join(self.path, self.name + ".sqlite")
def _connection(self):
c = sqlite.connect(self.filepath)
# c.execute("PRAGMA read_uncommitted = 1;")
return c
def create(self, table=None):
table = table or self.default_table
if self.path:
try:
os.mkdir(self.path)
except FileExistsError:
pass
c = self._connection()
cur = c.cursor()
cur.execute(
"create table if not exists "
+ table
+ " (k blob primary key, v blob);"
)
c.commit()
return self
def destroy(self):
if self.path:
try:
os.remove(self.filepath)
if not os.listdir(self.path):
os.rmdir(self.path)
except FileNotFoundError:
pass
def optimize(self):
self._connection(False).execute("vacuum")
def open(self, write=False, create=False):
if create:
self.create()
c = self._connection()
return self.open_connection(c, write=write, create=create)
def open_connection(self, conn, table=None, write=False, create=False,
closing=True):
table = table or self.default_table
if write:
if create:
self.create()
return SqliteWriter(conn, table, closing=closing)
else:
return SqliteReader(conn, table, closing=closing)
class SqliteReader(DBReader):
def __init__(self, connection, table, closing=True):
self._c = connection
self._table = table
self._closing = closing
self.closed = False
def __len__(self):
cur = self._c.execute("SELECT Count(*) FROM " + self._table)
return cur.fetchone()[0]
def __contains__(self, key):
row = self._c.execute(
"select 1 from "
+ self._table
+ " where k=? limit 1;", (Binary(key),)
).fetchone()
return row is not None
def __getitem__(self, key):
row = self._c.execute(
"select v from "
+ self._table
+ " where k=? limit 1;", (Binary(key),)
).fetchone()
if row is None:
raise KeyError(key)
return bytes(row[0])
def cursor(self):
return SqliteCursor(self._c.cursor(), self._table)
def keys(self):
cur = self._c.execute("select k from " + self._table + " order by k;")
for row in cur:
yield bytes(row[0])
def key_range(self, start, end):
cur = self._c.execute(
"select k from " + self._table + " where k >= ? and k < ?",
(Binary(start), Binary(end))
)
for row in cur:
yield bytes(row[0])
def update_items(self, items):
c = self._c
for key, value in items:
c.execute("insert or replace into "
+ self._table
+ " values (?, ?)",
(Binary(key), Binary(value)))
def close(self):
if self._closing:
self._c.close()
self.closed = True
class SqliteWriter(SqliteReader, DBWriter):
def __init__(self, connection, table, closing=True):
self._c = connection
self._table = table
self._closing = closing
self._c.execute("begin immediate transaction")
self.closed = False
def __setitem__(self, key, value):
self._c.execute(
"insert or replace into " + self._table + " values (?, ?)",
(Binary(key), Binary(value))
)
def __delitem__(self, key):
self._c.execute(
"delete from " + self._table + " where k=?", (Binary(key),)
)
def delete_by_prefix(self, prefix):
self._c.execute(
"delete from " + self._table + " where k like ?",
(Binary(prefix + b"%"), )
)
def close(self):
self.commit()
def cancel(self):
self._c.rollback()
if self._closing:
self._c.close()
self.closed = True
def commit(self):
self._c.commit()
if self._closing:
self._c.close()
self.closed = True
class SqliteCursor(Cursor):
def __init__(self, cur, table):
self._cur = cur
self._table = table
self._row = None
self.first()
def is_active(self):
return self._row is not None
def first(self):
self._cur.execute("select k, v from " + self._table + " order by k;")
self._row = self._cur.fetchone()
def next(self):
if self._row is None:
raise OverrunError
else:
self._row = self._cur.fetchone()
def find(self, key, fromfirst=True):
self._cur.execute("select k, v from "
+ self._table
+ " where k >= ? order by k;", (key,))
self._row = self._cur.fetchone()
def key(self):
if self._row is None:
return None
else:
return self._row[0]
def value(self):
if self._row is None:
return None
else:
return self._row[1]
def item(self):
return self._row
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment