Create a gist now

Instantly share code, notes, and snippets.

@coleifer /lsm_bench.py Secret
Last active Dec 7, 2015

What would you like to do?
import os
import time
import bsddb3
import kyotocabinet as kc
import plyvel
from lsm import LSM
def timed(fn):
def inner(*args):
s = time.time()
fn(*args)
return round(time.time() - s, 3)
return inner
class Test(object):
def __init__(self, n):
self.n = n
if os.path.exists(self.filename):
if os.path.isdir(self.filename):
os.removedirs(self.filename)
else:
os.unlink(self.filename)
self._db = self.get_db()
def close(self):
try:
self._db.close()
except AttributeError:
pass
@timed
def test_writes(self):
for i in range(self.n):
self.write('%09d' % i)
@timed
def test_reads(self):
for i in range(self.n):
self.read('%09d' % i)
@timed
def test_range(self, r):
l = int(self.n * r)
for i in range(0, self.n - l, l):
low = '%09d' % i
high = '%09d' % (i + l)
self.get_range(low, high)
def write(self, i):
self._db[i] = i
def read(self, i):
return self._db[i]
class BDBBTreeTest(Test):
filename = '/tmp/bdb_btree.db'
def get_db(self):
return bsddb3.btopen(self.filename)
def get_range(self, low, high):
self._db.set_location(low)
while True:
k, v = self._db.next()
if k == high:
break
class KyotoBTreeTest(Test):
filename = '/tmp/kyoto_btree.kct'
def get_db(self):
db = kc.DB()
db.open(self.filename, kc.DB.OWRITER | kc.DB.OCREATE)
return db
def write(self, i):
self._db.set(i, i)
def read(self, i):
return self._db.get(i)
def get_range(self, low, high):
cursor = self._db.cursor()
cursor.jump(low)
while True:
k, v = cursor.get()
if k == high:
break
cursor.next()
class LevelDBTest(Test):
filename = '/tmp/leveldb.db'
def get_db(self):
return plyvel.DB(self.filename, create_if_missing=True)
def write(self, i):
self._db.put(i, i)
def read(self, i):
return self._db.get(i)
def get_range(self, low, high):
for key, value in self._db.iterator(start=low, stop=high):
pass
class LSMTest(Test):
filename = '/tmp/lsm.db'
def get_db(self):
return LSM(
self.filename,
enable_multiple_processes=False,
safety_level=0,
)
def get_range(self, low, high):
for k, v in self._db[low:high]:
pass
tests = [
BDBBTreeTest,
KyotoBTreeTest,
LevelDBTest,
LSMTest,
]
N = 100000
print 'Testing with N =', N
print '------------------------------------'
print
for TestClass in tests:
test_name = TestClass.__name__.replace('Test', '')
print test_name
print '~' * len(test_name)
test = TestClass(N)
print 'Writes: ', test.test_writes()
print 'Reads: ', test.test_reads()
print 'Range (10%): ', test.test_range(.1)
print 'Range (20%): ', test.test_range(.2)
print 'Range (40%): ', test.test_range(.4)
print 'Range (80%): ', test.test_range(.8)
print
test.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment