Skip to content

Instantly share code, notes, and snippets.

@tailhook
Created January 13, 2011 16:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tailhook/778115 to your computer and use it in GitHub Desktop.
Save tailhook/778115 to your computer and use it in GitHub Desktop.
from __future__ import print_function
import unittest
"""
Proof of concept of versionning database with minimum memory leaked.
Backend, which keeps data on disk should be still just transactional key-value
storage.
We use numbers as uids and fixed strings as value. In each
subsequent transaction we modify first character and character with number of
transaction of a node value to easily verify them easily (see ``update()``).
We do modifications in four passes:
* modify ``FIRSTWAVE`` items
* forget ``FIRSTVERIFY`` items, verifying each step
* modify ``SECONDWAVE`` items
* forget ``SECONDVERIFY`` items, verifying each step
We only test update operation, since one can emulate delete with some kind of
update.
"""
NODES = 100
TRANSACTIONS = 15
FIRSTWAVE = 10
FIRSTVERIFY = 6
SECONDWAVE = TRANSACTIONS - FIRSTWAVE
SECONVERIFY = TRANSACTIONS - FIRSTVERIFY
TRANCHARS = '0123456789abcdefghjklmnopqrstuvwxyz'
class ProofDB(object):
"""
This is 100% right database, it keeps all transactions and both oldest
and newest versions of all nodes. Don't forget anything
"""
def __init__(self, dic):
self.old = dic.copy()
self.new = dic.copy()
self.tran = {}
self.newest_tran = 0
self.oldest_tran = None
def get(self, key):
return self.new[key]
def forget(self, tran):
pass
def get_by_tran(self, key, tran):
assert self.oldest_tran <= tran <= self.newest_tran,\
"{0} <= {1} <= {2}".format(self.oldest_tran, tran, self.newest_tran)
if tran == self.newest_tran:
return self.get(key)
elif tran < self.oldest_tran:
raise KeyError(key, tran)
for i in xrange(tran, self.oldest_tran-1, -1):
if key in self.tran[i]:
return self.tran[i][key]
return self._fallback(key)
def _fallback(self, key):
return self.old[key]
def set_nodes(self, dic):
self.newest_tran += 1
self.tran[self.newest_tran] = dic.copy()
if self.oldest_tran is None:
self.oldest_tran = self.newest_tran
self.new.update(dic)
def print_line(self, name, line):
print('{0:3s}: '.format(str(name).upper()), end='')
for i in xrange(NODES):
if i in line:
print(line[i][0], end='')
else:
print(' ', end='')
print()
def print(self):
self.print_line('NEW', self.new)
for i in xrange(self.newest_tran, self.oldest_tran-1, -1):
self.print_line(i, self.tran[i])
self.print_line('OLD', getattr(self, 'old', {}))
print('-'*(5+NODES))
self.print_line('NEW', self.new)
for i in xrange(self.newest_tran, self.oldest_tran-1, -1):
self.print_line(i, dict((j, self.get_by_tran(j, i))
for j in xrange(NODES)))
self.print_line('OLD', getattr(self, 'old', {}))
def print_line(self, name, line):
print('{0:3s}: '.format(str(name).upper()), end='')
for i in xrange(NODES):
if i in line:
print(line[i][0], end='')
else:
print(' ', end='')
print()
class LeakyDB(ProofDB):
"""This db tries to forget, but still have copies of all nodes"""
def forget(self, tran):
assert tran == self.oldest_tran
self.old.update(self.tran[tran])
self.oldest_tran += 1
del self.tran[tran]
class IncrementalDB(ProofDB):
"""This database stores all old nodes, and only changed new ones"""
def __init__(self, dic):
super(IncrementalDB, self).__init__(dic)
self.new = {}
def forget(self, tran):
assert tran == self.oldest_tran
self.old.update(self.tran[tran])
# need to forget something from new
self.oldest_tran += 1
for i in self.tran[tran]:
for t in xrange(self.newest_tran, self.oldest_tran-1, -1):
if i in self.tran[t]:
break
else:
del self.new[i]
del self.tran[tran]
def get(self, key):
if key in self.new:
return self.new[key]
else:
return self.old[key]
def set_nodes(self, dic):
self.newest_tran += 1
self.tran[self.newest_tran] = dic.copy()
if self.oldest_tran is None:
self.oldest_tran = self.newest_tran
self.new.update(dic)
class GoodDB(ProofDB):
"""This database stores all new nodes, and only old ones than changed in
subsequent transactions"""
def __init__(self, dic):
super(GoodDB, self).__init__(dic)
self.old = {}
def forget(self, tran):
assert tran == self.oldest_tran
# need to forget something from old
self.oldest_tran += 1
for i in self.tran[tran]:
for t in xrange(self.newest_tran, self.oldest_tran-1, -1):
if i in self.tran[t]:
self.old[i] = self.tran[tran][i]
break
else:
del self.old[i]
del self.tran[tran]
def set_nodes(self, dic):
self.newest_tran += 1
self.tran[self.newest_tran] = dic.copy()
if self.oldest_tran is None:
self.oldest_tran = self.newest_tran
for i in dic:
if i not in self.old:
self.old[i] = self.new[i]
self.new.update(dic)
def _fallback(self, key):
if key in self.old:
return self.old[key]
else:
return self.new[key]
class MirrorDB(object):
def __init__(self, *db):
self.dbs = db
def get(self, key):
vals = [db.get(key) for db in self.dbs]
res = vals[0]
assert all(v == res for v in vals), (key, vals)
return res
def forget(self, tran):
for db in self.dbs:
db.forget(tran)
def get_by_tran(self, key, tran):
vals = [db.get_by_tran(key, tran) for db in self.dbs]
res = vals[0]
assert all(v == res for v in vals), (key, tran, vals)
return res
def set_nodes(self, dic):
for db in self.dbs:
db.set_nodes(dic)
def print(self):
for db in self.dbs:
db.print()
def mirror(*classes):
def constructor(self, val): #sorry for self :)
vals = [c(val) for c in classes]
return MirrorDB(*vals)
return constructor
class TestProofDB(unittest.TestCase):
db_class = ProofDB
def update(self, str, index):
return TRANCHARS[index] + str[1:index] + TRANCHARS[index] + str[index+1:]
def verify(self, db, tran, amount_changed, amount_old):
"""
We need to check that version of all nodes is not higher than specified.
We also check that only ``amount_changed`` bytes of it are modified
(we currently modify them not more than twice in first pass and four
times in second).
Checking how many nodes changed in exacly this transaction also helps.
``amount_old`` allows to check how many old transactions influenced
nodes in this snapshot.
"""
count = 0
othercount = 0
for i in xrange(NODES):
n = db.get_by_tran(i, tran)
if n[0] == '0':
pass #ok
else:
if n[0] == TRANCHARS[tran]:
count += 1
else:
othercount += 1
self.assertTrue(TRANCHARS.index(n[0]) <= tran)
self.assertTrue(TRANSACTIONS-amount_changed <= n.count('0'))
self.assertTrue(n.count('0') <= TRANSACTIONS-1)
self.assertEquals(count, 10, "VAL {0}".format(tran))
if amount_old:
self.assertEquals(othercount, (tran-1)*amount_old)
def check(self, DB):
db = DB(dict((i, '0'*(TRANSACTIONS+1))
for i in xrange(NODES)))
# setting first 10 nodes
print("Doing first pass...")
for tran in xrange(1, FIRSTWAVE+1):
db.set_nodes(dict((i, self.update(db.get(i), tran))
for i in xrange(tran*5-5, tran*5+5)))
print("Database state:")
db.print()
print('Verifying...')
for i in xrange(1, FIRSTWAVE+1):
self.verify(db, i, 2, 5)
print('Starting to forget...')
for ftran in xrange(1, FIRSTVERIFY+1):
db.forget(ftran)
print('Forgotten {0}. Verifying...'.format(ftran))
for i in xrange(ftran+1, FIRSTWAVE+1):
self.verify(db, i, 2, 5)
if ftran == 3:
print("Three forgotten, look at results:")
db.print()
print('Filling second time...')
for tran in xrange(FIRSTWAVE+1, TRANSACTIONS+1):
db.set_nodes(dict((i, self.update(db.get(i), tran))
for i in xrange(NODES - 7 - tran*5, NODES + 3 - tran*5)))
print("Database state:")
db.print()
print('Verifying...')
for i in xrange(FIRSTWAVE, TRANSACTIONS+1):
self.verify(db, i, 4, None)
print('Starting to forget...')
for ftran in xrange(FIRSTVERIFY+1, TRANSACTIONS+1):
db.forget(ftran)
print('Forgotten {0}. Verifying...'.format(ftran))
for i in xrange(ftran+1, TRANSACTIONS+1):
self.verify(db, i, 4, None)
if ftran == FIRSTVERIFY + 5:
print("Five forgotten, look at results:")
db.print()
print("All done. Results again:")
db.print()
print("Don't worry, be happy!")
def test_db(self):
self.check(self.db_class)
class TestLeakyDB(TestProofDB):
db_class = LeakyDB
class TestIncrementalDB(TestProofDB):
db_class = IncrementalDB
class TestGoodDB(TestProofDB):
db_class = GoodDB
class TestAll(TestProofDB):
db_class = mirror(ProofDB, LeakyDB, IncrementalDB, GoodDB)
if __name__ == '__main__':
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment