Skip to content

Instantly share code, notes, and snippets.

@keisukefukuda
Created August 29, 2014 17:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save keisukefukuda/08661a64f24af0f90bdd to your computer and use it in GitHub Desktop.
Save keisukefukuda/08661a64f24af0f90bdd to your computer and use it in GitHub Desktop.
A very simple task management system using proper locking on NFS
import random
import os, os.path, sys
import time
import itertools
class TaskAssigner:
def __init__(self, *arrays, **kwd):
print arrays
self.arrays = arrays
if 'workdir' in kwd:
self.workdir = kwd['workdir']
del kwd['workdir']
else:
self.workdir = os.getcwd()
if 'datafile' in kwd:
self.datafile = kwd['datafile']
del kwd['datafile']
else:
self.datafile = os.path.join(self.workdir, "nfslock.db.json")
if 'lockfile' in kwd:
self.lockfile = kwd['lockfile']
del kwd['lockfile']
else:
self.lockfile = os.path.join(self.workdir, ".lockfile")
if len(kwd) != 0:
raise RuntimeError("Unknown keyword arguments to TaskAssigner(): " + str(kwd.keys()))
def create_random(self):
rdm = ""
if 'PBS_JOBID' in os.environ:
rdm = "lock." + os.environ["PBS_JOBID"]
else:
rdm = "lock.interactive"
rdm += ".%08d" % random.randint(1, 100000000)
return rdm
def get_task(self):
if not os.path.exists(self.datafile):
os.system("touch %s" % self.datafile)
self.aquire_lock()
with open(self.datafile, 'r') as f:
lines = [ln.strip() for ln in f.readlines()]
for t in itertools.product(*self.arrays):
if str(t) in lines:
continue
else:
with open(self.datafile, 'a') as f:
f.write(str(t) + "\n")
self.release_lock()
return t
self.release_lock()
return None
def aquire_lock(self):
if not os.path.exists(self.lockfile):
os.system("touch %s" % self.lockfile)
while True:
self.random_id = self.create_random()
os.link(self.lockfile, self.random_id)
nlink = int(os.stat(self.lockfile).st_nlink)
if nlink == 2:
return True
if nlink > 2:
os.unlink(self.random_id)
time.sleep(random.random() + 0.5)
else:
RuntimeError('something is wrong')
def release_lock(self):
os.unlink(self.random_id)
self.random_id = None
if __name__ == "__main__":
time.sleep(random.random() * 3)
a = range(10)
b = ['a', 'b', 'c', 'd', 'e', 'f']
c = range(10)
ja = TaskAssigner(a, b, c)
while True:
t = ja.get_task()
if t is None: break
else:
print t, os.getpid()
time.sleep(0.2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment