Skip to content

Instantly share code, notes, and snippets.

@phizaz
Last active June 7, 2018 16:55
Show Gist options
  • Save phizaz/690ffe8220a8290dfa8c30d56f22f0ea to your computer and use it in GitHub Desktop.
Save phizaz/690ffe8220a8290dfa8c30d56f22f0ea to your computer and use it in GitHub Desktop.
A simple memory (with storage) python cache system with live cache update.
from database import pickledb
from os.path import exists, join, dirname
from os import makedirs
from null import Null
import uuid
class CacheObject:
def __init__(self, cacher, key, val):
assert isinstance(cacher, Cacher)
self._cacher = cacher
self._uuid = uuid.uuid1().int
self._key = key
self._val = val
self._locked = False
def hit(self):
return not isinstance(self._val, Null)
def __del__(self):
self._cacher.unsubscribe(self)
def get(self):
return self._val
def set(self, val):
self._val = val
self.dump()
return val
def dump(self):
self._cacher.apply_change(self._key, self._val)
def lock(self):
self._locked = True
def unlock(self):
self._locked = False
def _on_change(self, val):
assert not self._locked
self._val = val
class Cacher:
def __init__(self, namespace):
self.namespace = namespace
self._subscription = dict()
db_path = join(dirname(__file__), 'databases')
if not exists(db_path):
makedirs(db_path)
db_file = join(db_path, '{}.db'.format(namespace))
self._db = pickledb(db_file, True)
def key(self, key):
# get a cache object of the key
val = self._get(key)
obj = CacheObject(self, key, val)
self.subscribe(key, obj)
return obj
def subscribe(self, key, obj):
assert isinstance(obj, CacheObject)
if key not in self._subscription:
sub = self._subscription[key] = dict()
else:
sub = self._subscription[key]
id = obj._uuid
sub[id] = obj
def unsubscribe(self, obj):
assert isinstance(obj, CacheObject)
key = obj._key
id = obj._uuid
del self._subscription[key][id]
def apply_change(self, key, val):
# update change to database
self._set(key, val)
# notify change to all objects that subscribe
sub = self._subscription[key]
for id, obj in sub.items():
obj._on_change(val)
# PRIVTAES
def _get(self, key):
return self._db.get(key)
def _set(self, key, val):
self._db.set(key, val)
return val
#!/usr/bin/env python
# IMPORTANT: this source code is not the original one. Changes have been made by Konpat Preechakul
# Copyright (c) 2015, Harrison Erd
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# Neither the name of the Harrison Erd nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "
# AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS
# BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
# THE POSSIBILITY OF SUCH DAMAGE.
import os
import simplejson
from database_worker import Worker
from null import Null
def load(location, option):
'''Return a pickledb object. location is the path to the json file.'''
return pickledb(location, option)
class pickledb(object):
def __init__(self, location, option):
'''Creates a database object and loads the data from the location path.
If the file does not exist it will be created on the first update.'''
self.load(location, option)
# won't write more often than 'debonuce' option
# worker will make writing database uses less effrot
self.worker = Worker(cnt=1, want_return_value=False, debounce=5000)
def __del__(self):
self._dumpdb_sync()
def load(self, location, option):
'''Loads, reloads or changes the path to the db file.'''
location = os.path.expanduser(location)
self.loco = location
self.fsave = option
if os.path.exists(location):
self._loaddb()
else:
self.db = {}
return True
def dump(self):
'''Force dump memory db to file.'''
self._dumpdb(True)
return True
def set(self, key, value):
'''Set the (string,int,whatever) value of a key'''
self.db[key] = value
self._dumpdb(self.fsave)
return True
def get(self, key):
'''Get the value of a key'''
try:
return self.db[key]
except KeyError:
# return Null object to indicate that it doesn't exist
return Null()
def getall(self):
'''Return a list of all keys in db'''
return self.db.keys()
def rem(self, key):
'''Delete a key'''
del self.db[key]
self._dumpdb(self.fsave)
return True
def lcreate(self, name):
'''Create a list'''
self.db[name] = []
self._dumpdb(self.fsave)
return True
def ladd(self, name, value):
'''Add a value to a list'''
self.db[name].append(value)
self._dumpdb(self.fsave)
return True
def lextend(self, name, seq):
'''Extend a list with a sequence'''
self.db[name].extend(seq)
self._dumpdb(self.fsave)
return True
def lgetall(self, name):
'''Return all values in a list'''
return self.db[name]
def lget(self, name, pos):
'''Return one value in a list'''
return self.db[name][pos]
def lrem(self, name):
'''Remove a list and all of its values'''
number = len(self.db[name])
del self.db[name]
self._dumpdb(self.fsave)
return number
def lpop(self, name, pos):
'''Remove one value in a list'''
value = self.db[name][pos]
del self.db[name][pos]
self._dumpdb(self.fsave)
return value
def llen(self, name):
'''Returns the length of the list'''
return len(self.db[name])
def append(self, key, more):
'''Add more to a key's value'''
tmp = self.db[key]
self.db[key] = ('%s%s' % (tmp, more))
self._dumpdb(self.fsave)
return True
def lappend(self, name, pos, more):
'''Add more to a value in a list'''
tmp = self.db[name][pos]
self.db[name][pos] = ('%s%s' % (tmp, more))
self._dumpdb(self.fsave)
return True
def dcreate(self, name):
'''Create a dict'''
self.db[name] = {}
self._dumpdb(self.fsave)
return True
def dadd(self, name, pair):
'''Add a key-value pair to a dict, "pair" is a tuple'''
self.db[name][pair[0]] = pair[1]
self._dumpdb(self.fsave)
return True
def dget(self, name, key):
'''Return the value for a key in a dict'''
return self.db[name][key]
def dgetall(self, name):
'''Return all key-value pairs from a dict'''
return self.db[name]
def drem(self, name):
'''Remove a dict and all of its pairs'''
del self.db[name]
self._dumpdb(self.fsave)
return True
def dpop(self, name, key):
'''Remove one key-value pair in a dict'''
value = self.db[name][key]
del self.db[name][key]
self._dumpdb(self.fsave)
return value
def dkeys(self, name):
'''Return all the keys for a dict'''
return self.db[name].keys()
def dvals(self, name):
'''Return all the values for a dict'''
return self.db[name].values()
def dexists(self, name, key):
'''Determine if a key exists or not'''
if self.db[name][key] is not None:
return 1
else:
return 0
def deldb(self):
'''Delete everything from the database'''
self.db= {}
self._dumpdb(self.fsave)
return True
def _loaddb(self):
'''Load or reload the json info from the file'''
self.db = simplejson.load(open(self.loco, 'rb'))
def _dumpdb(self, forced):
'''Write/save the json dump into the file'''
if forced:
# write only the latest if many stacked in the queue
self.worker.clear_job()
self.worker.add_job(self._dumpdb_sync)
def _dumpdb_sync(self):
simplejson.dump(self.db, open(self.loco, 'wt'))
from Queue import Queue
from threading import Thread, Event
import time
'''
A single worker with queue and promise pattern
usage: worker.add_job(fn).then(callback_fn)
note: this might be anti-pattern
'''
class Promise:
def __init__(self):
self.val = None
self.success = False
self.callback = None
self.event = Event()
def done(self, value):
# print('job done:', value)
self.val = value
self.event.set()
self.event.clear()
if self.callback:
self.callback(value)
def wait(self):
self.event.wait()
def then(self, callback):
self.callback = callback
def is_ready(self):
return self.success is True
class Worker:
def __init__(self, cnt=1, want_return_value=True, debounce=0):
self.queue = Queue()
self.threads = self._init_threads(cnt)
self.want_return_value = want_return_value
self.debounce = debounce
if want_return_value:
self.results = []
def add_job(self, fn):
promise = Promise()
if self.want_return_value:
idx = len(self.results)
else:
idx = -1
self.queue.put((idx, fn, promise))
if self.want_return_value:
self.results.append(None)
return promise
def clear_job(self):
# clear jobs in the queue
while not self.queue.empty():
self.queue.get()
self.queue.task_done()
def wait(self):
self.queue.join()
if self.want_return_value:
return self.results
def _worker(self):
while True:
i, fn, promise = self.queue.get()
r = fn()
if self.want_return_value:
self.results[i] = r
promise.done(r)
self.queue.task_done()
if self.debounce > 0:
time.sleep(self.debounce / 1000.0)
def _init_threads(self, cnt):
threads = []
for i in range(cnt):
thread = Thread(target=self._worker)
threads.append(thread)
thread.daemon = True
thread.start()
return threads
class Null:
def __init__(self):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment