Created
December 1, 2015 17:03
-
-
Save harlowja/b4f0ddadbda1f92cc1e2 to your computer and use it in GitHub Desktop.
cleaner.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import threading | |
import time | |
import fasteners | |
from six.moves import range as compat_range | |
LOG = logging.getLogger(__name__) | |
def _walk_maybe_paths(lock_dir, recurse=False, filter_func=os.path.isfile): | |
if os.path.isdir(lock_dir): | |
if recurse: | |
for (dir_path, _dir_names, file_names) in os.walk(lock_dir): | |
for fn in file_names: | |
fn_path = os.path.join(dir_path, fn) | |
if filter_func(fn_path): | |
yield fn_path | |
else: | |
for fn in os.listdir(lock_dir): | |
fn_path = os.path.join(lock_dir, fn) | |
if filter_func(fn_path): | |
yield fn_path | |
class CleanLocksJanitor(object): | |
def __init__(self, lock_dir, clean_delay=60.0, | |
acquire_timeout=0.1, acquire_attempts=3, | |
recurse=False, logger=None): | |
self.lock_dir = lock_dir | |
self.acquire_timeout = acquire_timeout | |
self.acquire_attempts = acquire_attempts | |
self.recurse = recurse | |
self.dead = threading.Event() | |
self.clean_delay = clean_delay | |
self.logger = logger or LOG | |
def _try_clean(self, lk_path): | |
self.logger.info("Attempting to clean lock path '%s'", lk_path) | |
lk = fasteners.Lock(lk_path) | |
for i in compat_range(0, self.acquire_attempts): | |
gotten = lk.acquire(blocking=True, | |
timeout=self.acquire_timeout) | |
if not gotten: | |
self.logger.debug("Failed acquiring lock path '%s' for the" | |
" %s time (%s attempts are left)", lk_path, | |
i + 1, self.acquire_attempts - (i + 1)) | |
try: | |
if gotten: | |
try: | |
st = os.stat(lk_path) | |
os.unlink(lk_path) | |
except (OSError, IOError): | |
self.logger.info("Failed destroying lock path '%s'" | |
" even after lock acquisition" | |
" succeeded", exc_info=True) | |
else: | |
self.logger.info("Destroyed lock path '%s', it has" | |
" existed for %0.2f seconds", | |
lk_path, time.time() - st.st_ctime) | |
return True | |
finally: | |
if gotten: | |
lk.release() | |
return False | |
def run_forever(self): | |
while not self.dead.is_set(): | |
for lk_path in _walk_maybe_paths(self.lock_dir, | |
recurse=self.recurse): | |
cleaned = self._try_clean(lk_path) | |
if cleaned: | |
self.logger.debug("Cleaned lock path '%s'", lk_path) | |
self.dead.wait(self.clean_delay) | |
def main(): | |
import logging | |
import sys | |
logging.basicConfig(level=logging.DEBUG) | |
cj = CleanLocksJanitor(os.path.abspath(sys.argv[1])) | |
t = threading.Thread(target=cj.run_forever) | |
t.start() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment