Skip to content

Instantly share code, notes, and snippets.

@georgepsarakis
Created October 28, 2014 09:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save georgepsarakis/b42f5cf59d45501b9c88 to your computer and use it in GitHub Desktop.
Save georgepsarakis/b42f5cf59d45501b9c88 to your computer and use it in GitHub Desktop.
Simple SQLite-backed key-value storage Rest API. Built with Flask & flask-restful.
import os
import sqlite3
from hashlib import md5
from time import time
import simplejson as json
from flask import Flask
from flask.ext import restful
from flask import g
from flask import request
app = Flask(__name__)
api = restful.Api(app)
PATH = os.path.dirname(os.path.abspath(__file__))
DATABASE = os.path.join(PATH, 'data.db')
CREATE_INDEX_LIST = """
CREATE TABLE IF NOT EXISTS indexes(name TEXT PRIMARY KEY, hash TEXT)
"""
CREATE_INDEX = """
CREATE TABLE IF NOT EXISTS
%(index)s(key TEXT,
key_hash TEXT UNIQUE,
value BLOB,
modified REAL
)
"""
INDEX_CACHE = {}
def get_db():
db = getattr(g, '_database', None)
if db is None:
db = g._database = sqlite3.connect(DATABASE, isolation_level=None)
return db
def query_db(query, args=(), one=False):
cursor = get_db().execute(query, args)
query_type = query.lower().strip().split()[0]
r = None
if query_type == "select":
r = cursor.fetchall()
elif query_type in [ "update", "insert", "replace" ]:
r = cursor.rowcount
get_db().commit()
cursor.close()
if query_type == "select" and r and one:
return r[0]
return r
def manage_index(name):
global INDEX_CACHE
if not index_exists(name):
create_index(name)
else:
INDEX_CACHE[name] = index_hash(name)
return INDEX_CACHE[name]
def h(value):
if not isinstance(value, basestring):
value = unicode(value)
return md5(value.encode('utf-8', 'ignore')).hexdigest()
def index_hash(name):
return "index_%s" % h(name)
def create_index(name):
global INDEX_CACHE
idx_hash = index_hash(name)
query_db(CREATE_INDEX % {'index': idx_hash,})
INDEX_CACHE[name] = idx_hash
SQL = "INSERT INTO indexes VALUES(?, ?)"
query_db(SQL, (name, INDEX_CACHE[name],))
def index_exists(name):
global INDEX_CACHE
if name in INDEX_CACHE:
return True
SQL = """SELECT name FROM sqlite_master WHERE
type='table' AND name= ? COLLATE NOCASE"""
return query_db(SQL, (index_hash(name),))
@app.before_first_request
def create_index_list():
query_db(CREATE_INDEX_LIST)
@app.teardown_appcontext
def close_connection(exception):
db = getattr(g, '_database', None)
if db is not None:
db.close()
def index_store(index, key, value):
key_hash = h(key)
SQL = "INSERT OR REPLACE INTO %s(key, key_hash, value, modified) VALUES(?, ?, ?, ?)" % index
r = query_db(SQL, (key, key_hash, value, time(),))
return key, r
class Home(restful.Resource):
def get(self):
return {"hello":"world"}
def post(self):
return {"hello":"world"}
class Index(restful.Resource):
def get(self, index, key):
index = manage_index(index)
SQL = 'SELECT * FROM %s WHERE key_hash = ?' % index
r = query_db(SQL, (h(key),), True)
if not r:
restful.abort(404)
return {"key": r[0], "value": r[2], "modified": r[3],}
def delete(self, index, key):
index = manage_index(index)
SQL = 'DELETE FROM %s WHERE key_hash = ?' % index
r = query_db(SQL, (h(key),))
return {'key': key, 'deleted': r > 0}
def post(self, index, key):
index = manage_index(index)
try:
raw = request.get_data()
''' Validate JSON '''
value = request.get_json()
except:
restful.abort(400)
key, r = index_store(index, key, raw)
return {'key': key, "updated": r > 0,}
api.add_resource(Home, '/')
api.add_resource(Index, '/<string:index>/<string:key>')
if __name__ == '__main__':
app.run(debug=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment