Skip to content

Instantly share code, notes, and snippets.

@harlowja
Created December 1, 2015 17:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save harlowja/b4f0ddadbda1f92cc1e2 to your computer and use it in GitHub Desktop.
Save harlowja/b4f0ddadbda1f92cc1e2 to your computer and use it in GitHub Desktop.
cleaner.py
import logging
import os
import threading
import time
import fasteners
from six.moves import range as compat_range
LOG = logging.getLogger(__name__)
def _walk_maybe_paths(lock_dir, recurse=False, filter_func=os.path.isfile):
if os.path.isdir(lock_dir):
if recurse:
for (dir_path, _dir_names, file_names) in os.walk(lock_dir):
for fn in file_names:
fn_path = os.path.join(dir_path, fn)
if filter_func(fn_path):
yield fn_path
else:
for fn in os.listdir(lock_dir):
fn_path = os.path.join(lock_dir, fn)
if filter_func(fn_path):
yield fn_path
class CleanLocksJanitor(object):
def __init__(self, lock_dir, clean_delay=60.0,
acquire_timeout=0.1, acquire_attempts=3,
recurse=False, logger=None):
self.lock_dir = lock_dir
self.acquire_timeout = acquire_timeout
self.acquire_attempts = acquire_attempts
self.recurse = recurse
self.dead = threading.Event()
self.clean_delay = clean_delay
self.logger = logger or LOG
def _try_clean(self, lk_path):
self.logger.info("Attempting to clean lock path '%s'", lk_path)
lk = fasteners.Lock(lk_path)
for i in compat_range(0, self.acquire_attempts):
gotten = lk.acquire(blocking=True,
timeout=self.acquire_timeout)
if not gotten:
self.logger.debug("Failed acquiring lock path '%s' for the"
" %s time (%s attempts are left)", lk_path,
i + 1, self.acquire_attempts - (i + 1))
try:
if gotten:
try:
st = os.stat(lk_path)
os.unlink(lk_path)
except (OSError, IOError):
self.logger.info("Failed destroying lock path '%s'"
" even after lock acquisition"
" succeeded", exc_info=True)
else:
self.logger.info("Destroyed lock path '%s', it has"
" existed for %0.2f seconds",
lk_path, time.time() - st.st_ctime)
return True
finally:
if gotten:
lk.release()
return False
def run_forever(self):
while not self.dead.is_set():
for lk_path in _walk_maybe_paths(self.lock_dir,
recurse=self.recurse):
cleaned = self._try_clean(lk_path)
if cleaned:
self.logger.debug("Cleaned lock path '%s'", lk_path)
self.dead.wait(self.clean_delay)
def main():
import logging
import sys
logging.basicConfig(level=logging.DEBUG)
cj = CleanLocksJanitor(os.path.abspath(sys.argv[1]))
t = threading.Thread(target=cj.run_forever)
t.start()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment