Skip to content

Instantly share code, notes, and snippets.

@apg
Created January 18, 2010 16:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save apg/280176 to your computer and use it in GitHub Desktop.
Save apg/280176 to your computer and use it in GitHub Desktop.
# broken, hacked up version of
# http://bret.appspot.com/entry/how-friendfeed-uses-mysql for sqlite
# database = bequas.Database('something.db')
# database.initialize(); database.create_index('hello', unique=True);
# database.insert({'hello': 'world'})
import sqlite3
import uuid
import datetime
try:
import json
except:
import simplejson as json
DEBUG = True
class Backend(object):
def __init__(self, db, **kwargs):
self._db = db
self._conn = sqlite3.connect(kwargs.pop('db_name', ''))
def close(self):
self._conn.close()
def initialize(self):
cursor = self._conn.cursor()
try:
sql = ["""CREATE TABLE IF NOT EXISTS documents (
row_id integer auto_increment primary key,
id char(32) unique,
contents blob,
updated datetime,
created datetime
)""",
]
for stmt in sql:
print 'initialize: ', stmt
cursor.execute(stmt)
finally:
cursor.close()
def create_index(self, column, unique=False):
"""Creates an index in the database using table and columns
"""
cursor = self._conn.cursor()
try:
sql = "CREATE %(unique)s INDEX %(name)s ON %(table)s (%(column)s)"
params = {'unique': ''}
if unique:
params['unique'] = ' UNIQUE '
column_name = column.split(None, 1)[0]
table = self._db._table_name(column)
params['name'] = self._db._index_name(column)
params['column'] = 'value'
params['table'] = table
ready_sql = sql % params
if DEBUG:
print 'backend.create_index: ', ready_sql
cursor.execute(ready_sql)
finally:
cursor.close()
def drop_index(self, table, column):
cursor = self._conn.cursor()
try:
sql = "DROP INDEX IF EXISTS %s_%s_idx"
cursor.execute(sql % (table, column))
finally:
cursor.close()
def create_table(self, name, columns):
cursor = self._conn.cursor()
try:
sql = "CREATE TABLE %(name)s (%(columns)s)"
cursor.execute(sql % {'name': name, 'columns': ','.join(columns)})
finally:
cursor.close()
def drop_table(self, name):
cursor = self._conn.cursor()
try:
sql = "DROP TABLE IF EXISTS %s"
cursor.execute(sql % name)
finally:
cursor.close()
def query(self, sql, params=None, limit=None):
cursor = self._conn.cursor()
try:
args = [sql]
if params:
args.append(params)
print 'query args: ', args
cursor.execute(*args)
if limit == 1:
return cursor.fetchone()
else:
return cursor.fetchmany()
finally:
cursor.close()
def execute(self, sql, params=None):
cursor = self._conn.cursor()
try:
args = [sql]
if params:
args.append(params)
cursor.execute(*args)
finally:
cursor.close()
def insert(self, table, col_values):
sql = """INSERT INTO %(table)s (%(columns)s) VALUES (%(placeholders)s)"""
columns = []
qs = []
values = []
for col, val in col_values.items():
columns.append(col)
qs.append('?')
values.append(val)
new_sql = sql % {'table': table,
'columns': ','.join(columns),
'placeholders': ','.join(qs)}
if DEBUG:
print 'insert: ', new_sql, values
return self.execute(new_sql, params=values)
def get_indexes(self, where=None, params=None):
cursor = self._conn.cursor()
try:
sql = "SELECT name FROM sqlite_master WHERE type = 'index'"
if where:
sql = '%s AND %s' % (sql, where)
if DEBUG:
print 'get_indexes:', sql
if params:
newcur = cursor.execute(sql, params)
else:
newcur = cursor.execute(sql)
return [row[0] for row in newcur.fetchmany()]
finally:
cursor.close()
def get_tables(self, where=None, params=None):
cursor = self._conn.cursor()
try:
sql = "SELECT name FROM sqlite_master WHERE type = 'table'"
if where:
sql += '%s AND %s' % (sql, params)
if params:
newcur = cursor.execute(sql, params)
else:
newcur = cursor.execute(sql)
newcur = cursor.execute(sql)
return [row[0] for row in newcur.fetchmany()]
finally:
cursor.close()
class Database(object):
"""Wraps the database specific methods
"""
def __init__(self, db_type='sqlite3'):
self._backend = Backend(self)
def initialize(self):
return self._backend.initialize()
def insert(self, spec):
"""Inserts a document into the database"""
# first, check to see if this thing is a document
uid = uuid.uuid1()
new_id = uid.get_hex()
if isinstance(spec, dict):
spec['_id'] = new_id
else:
raise TypeError('spec must be a dictionary')
# create the document
document = {'id': new_id,
'contents': json.dumps(spec),
'created': datetime.datetime.utcnow(),}
self._backend.insert('documents', document)
indexes = self.get_indexes_for_columns(spec.keys())
for idx in indexes:
column = self._column_from_index(idx)
self._backend.insert(self._table_name(column),
{'document_id': new_id,
'value': spec[column]})
return new_id
def update(self, id, spec):
if isinstance(spec, dict):
spec['_id'] = id
else:
raise TypeError('spec must be a dictionary')
document = {'id': id,
'contents': json.dumps(spec),
'updated': datetime.datetime.utcnow()}
self._backend.update('documents',
document,
where="id = ?", params=(id,))
indexes = self.get_indexes_for_columns(spec.keys())
for idx in indexes:
column = self._column_from_index(idx)
self._backend.upsert(self._table_name(column),
{'document_id': id,
'value': spec[column]},
where='document_id = ?', params=(id,))
def delete(self, id):
document = self.get(id=id)
if document:
columns = self.get_indexes_for_columns(document.keys())
for column in columns:
table = self._table_name(column)
self._backend.execute('DELETE FROM %s WHERE id = ?' % table,
params=(id,))
self._backend.execute('DELETE FROM documents WHERE row_id = ?',
params=(document['row_id'],))
def get(self, spec):
"""Returns a document specified by spec
"""
if isinstance(spec, str):
ready_sql = "SELECT contents FROM documents WHERE id = ?"
params = [spec]
elif isinstance(spec, dict):
joins = []
wheres = ['1 = 1']
params = []
indexes = self.get_indexes_for_columns(spec.keys())
for idx in indexes:
column_name = self._column_from_index(idx)
table_name = self._table_name(column_name)
joins.append('INNER JOIN %(n)s ON %(n)s.document_id = documents.id'\
% {'n': table_name})
wheres.append('%s.value like ?' % table_name)
params.append(spec[column_name])
sql = '''SELECT contents FROM documents %(joins)s WHERE %(where)s'''
ready_sql = sql % {'joins': ' '.join(joins),
'where': ' AND '.join(wheres)}
else:
raise TypeError("get must be given an id or a spec")
if DEBUG:
print 'get: ', ready_sql, params
record = self._backend.query(ready_sql, params=params, limit=1)
if record:
return json.loads(record[0])
return None
def all(self):
"""Returns a cursor object, providing all documents in the database
which can then be filtered.
"""
pass
def create_index(self, column, unique=False):
"""Creates an index for column
"""
self._backend.create_table('document_%s_value' % column,
('document_id char(32) primary key',
'value blob'))
self._backend.create_index(column,
unique=unique)
def drop_index(self, column):
self._backend.drop_table('document_%s' % column)
self._backend.drop_index('document_%s' % column, 'value')
def get_indexes_for_columns(self, columns):
index_names = ["'%s'" % self._index_name(col) for col in columns]
return [idx for idx in self._backend.get_indexes(
'name in (%s)' % ','.join(index_names))]
def close(self):
self._backend.close()
def _index_name(self, col):
return 'document_%s_value_idx' % col
def _table_name(self, table):
return 'document_%s_value' % table
def _column_from_index(self, idx):
return idx[9:-10]
def _table_from_index(self, idx):
return idx[0:-4]
class Cursor(object):
def __init__(self, db):
self._db = db
def where(self, spec):
pass
def range(self, max_or_start, max=None):
if max:
start = max_or_start
else:
start = 0
max = max_or_start
def sort(self, desc):
pass
def count(self):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment