Skip to content

Instantly share code, notes, and snippets.

@NightTsarina
Created November 17, 2020 01:13
Show Gist options
  • Save NightTsarina/e6bb8734bc15f333ccdcd6ae7d31893f to your computer and use it in GitHub Desktop.
Save NightTsarina/e6bb8734bc15f333ccdcd6ae7d31893f to your computer and use it in GitHub Desktop.
LMDB crash test
#!/usr/bin/env python3
# vim:ts=2:sw=2:et:ai:sts=2
import lmdb
import logging
from multiprocessing import Process, get_logger, log_to_stderr
import os
import random
import sys
import tempfile
import time
LOGGER = get_logger()
def dbopen(dbpath, use_writemap, set_mapsize0, resize=False):
dbenv = lmdb.Environment(
dbpath, subdir=False, create=True, readahead=False,
writemap=use_writemap)
map_size, used_size = stats(dbenv)
if resize:
map_size = map_size * 2
LOGGER.info('Resizing to %d.', map_size)
dbenv.set_mapsize(map_size)
with dbenv.begin(write=True) as txn:
txn.put(b'_resized', b'1')
elif set_mapsize0:
dbenv.set_mapsize(0)
return dbenv
def stats(dbenv):
stat = dbenv.stat()
info = dbenv.info()
map_size = info['map_size']
page_size = stat['psize']
used_size = page_size * (info['last_pgno'] + 1)
LOGGER.info('LMDB stats: map_size=%d, used_size=%d', map_size, used_size)
if used_size > map_size:
LOGGER.info('!!!! map_size < used_size')
return map_size, used_size
def task(dbpath, use_writemap, set_mapsize0, task_nr):
LOGGER.info('Process started')
dbenv = dbopen(dbpath, use_writemap, set_mapsize0)
try:
if task_nr % 2:
count = 10000
else:
count = 100000
for x in range(count):
if task_nr % 2:
job = [
(b'%3d_%2d' % (x, n), b'a' * random.randrange(100, 1000))
for n in range(random.randrange(2, 100))
]
else:
job = [(b'%4d' % x, b'a' * random.randrange(100, 10000))]
for _ in range(3):
try:
with dbenv.begin(write=True) as txn:
for item in job:
key, value = item
txn.put(key, value)
LOGGER.debug('item saved (size=%d)', len(value))
break
except lmdb.MapFullError:
LOGGER.info('** MDB_MAP_FULL')
dbenv.close()
dbenv = dbopen(dbpath, use_writemap, set_mapsize0, resize=True)
except lmdb.MapResizedError:
LOGGER.info('** MDB_MAP_RESIZED')
dbenv.close()
dbenv = dbopen(dbpath, use_writemap, set_mapsize0, resize=False)
else:
raise RuntimeError('Could not write after 3 attempts')
except:
LOGGER.exception('Process raised an exception')
raise
finally:
LOGGER.info('Process finished')
def procs_ok(procs):
for proc in procs:
if not proc.is_alive():
if proc.exitcode >= 0:
LOGGER.info('Process %s exited with code %d', proc.name,
proc.exitcode)
else:
LOGGER.info('Process %s exited with signal %d', proc.name,
-proc.exitcode)
return False
return True
if __name__ == '__main__':
if len(sys.argv) != 3:
print('%s <use_writemap> <call set_mapsize(0)>' % sys.argv[0])
sys.exit()
use_writemap = int(sys.argv[1])
set_mapsize0 = int(sys.argv[2])
logger = log_to_stderr()
logger.setLevel(logging.INFO)
with tempfile.TemporaryDirectory() as dbdir:
dbpath = dbdir + '/test'
nproc = 2
p = [Process(target=task, args=(dbpath, use_writemap, set_mapsize0, n))
for n in range(nproc)]
for n in range(nproc):
p[n].start()
exit = None
while True:
for n in range(nproc):
if not p[n].is_alive():
LOGGER.info('Process %s died with status %d.', p[n].name,
p[n].exitcode)
exit = p[n].exitcode
break
else:
# multiprocessing does not handle well processes killed by the
# kernel, so check using waitpid.
pid, status = os.waitpid(p[n].pid, os.WNOHANG)
if pid:
exitcode = os.waitstatus_to_exitcode(status)
LOGGER.error('Process %s died unexpectedly with status %d.', pid,
exitcode)
exit = exitcode
break
# otherwise, it is still alive
else:
time.sleep(1)
continue
if exit is None:
exit = 1
break
for n in range(nproc):
if p[n].is_alive():
p[n].terminate()
p[n].join()
sys.exit(exit)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment