Skip to content

Instantly share code, notes, and snippets.

@matthewstory
Last active December 11, 2015 08:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matthewstory/4576817 to your computer and use it in GitHub Desktop.
Save matthewstory/4576817 to your computer and use it in GitHub Desktop.
An example of how to coordinate writes from many forks to a single xapian database using a file lock to serialize writes.
import fcntl
import time
import os
import sys
import shutil
import errno
import xapian as _x
# list of children to wait on
_pids = []
# database directory
_DB = "./test.db"
# number of forks
_FORKS = 10
# Make sure our test is clean
try:
shutil.rmtree(_DB)
except OSError, e:
if e.errno != errno.ENOENT:
raise e
os.mkdir(_DB)
def _safe_db_do(db, attr, *args, **kwargs):
'''As the database may be modified at any time, we need to reopen, and
trap DatabaseModifiedError for each operation, and retry until
success'''
while True:
db.reopen()
try:
return getattr(db, attr)(*args, **kwargs)
except _x.DatabaseModifiedError:
pass
def _fork_term(fork_num):
'''Wrapper to create our fork-specific term'''
return "".join([ "XHELO", str(fork_num), ])
def _writable():
'''Blocking wait on lock, then yield the writable DB'''
# open the lock-file, in the DB dir to flock against
lockfd = os.open(os.path.join(_DB, ".writelock"), os.O_CREAT, 0666)
try:
fcntl.flock(lockfd, fcntl.LOCK_EX)
return lockfd, _x.WritableDatabase(_DB, _x.DB_CREATE_OR_OPEN)
except Exception, e:
os.close(lockfd)
raise e
for i in range(_FORKS):
pid = os.fork()
# child fork
if 0 == pid:
lockfd, writable = _writable()
try:
print "lock acquired by: %s" % os.getpid()
readable = _x.Database(_DB)
try:
# sleep here only to exagerate the serialization
time.sleep(0.5)
doc = _x.Document()
term = _fork_term(i+2)
doc.add_term(term)
# add something new, and modify something contended
writable.replace_document(i+2, doc)
writable.replace_document(1, doc)
# you must commit, or your changes may not be picked up by
# other forks, when many are reading
writable.commit()
writable.close()
for t in _safe_db_do(readable, 'get_document', i+2).termlist():
print "new: %s; should be: %s" % (t.term, term)
for t in _safe_db_do(readable, 'get_document', 1).termlist():
print "contested: %s; should be: %s" % (t.term, term)
finally:
readable.close()
finally:
os.close(lockfd)
writable.close()
# cleanup -- Exit with the i number, to determine (cheaply) which
# fork exited last
os._exit(i)
else:
_pids.append(pid)
# wait for the forks to cycle out
last_exit = 0
while len(_pids):
pid, last_exit = os.waitpid(-1, 0)
if not os.WIFEXITED(last_exit):
print >> sys.stderr, "abnormal termination of fork: %s" % pid
_pids.remove(pid)
last_exit = os.WEXITSTATUS(last_exit)
# demonstrate the contested value matches the last exiting fork
readable = _x.Database(_DB)
for i in range(_FORKS):
for t in _safe_db_do(readable, 'get_document', i+2).termlist():
print "uncontested: %s; should be: %s" % (t.term, _fork_term(i+2))
for t in _safe_db_do(readable, 'get_document', 1).termlist():
print "contested: %s; should be: %s" % (t.term, _fork_term(last_exit+2))
sys.exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment