Skip to content

Instantly share code, notes, and snippets.

@mx-moth
Last active August 2, 2018 01:36
Show Gist options
  • Save mx-moth/79c0bba89a242bbf5c4682ac9686ce92 to your computer and use it in GitHub Desktop.
Save mx-moth/79c0bba89a242bbf5c4682ac9686ce92 to your computer and use it in GitHub Desktop.
Python multiprocess FileLock class using fcntl.flock()
import fcntl
import os
from contextlib import contextmanager
MODE_NAMES = {
fcntl.LOCK_SH: 'shared',
fcntl.LOCK_EX: 'exclusive',
}
class FileLock:
def __init__(self, filename):
self.filename = filename
self.reader = self.shared = _FileLock(self, fcntl.LOCK_SH)
self.writer = self.exclusive = _FileLock(self, fcntl.LOCK_EX)
self.held = False
self.mode = None
self._handle = None
@property
def handle(self):
if self._handle is None or self._handle.closed:
self._handle = open(self.filename, 'w+')
return self._handle
def close(self):
if self.held:
raise RuntimeError("Can't close held lock, release it first")
if self._handle and not self._handle.closed:
self._handle.close()
self._handle = None
def delete(self):
self.close()
os.remove(self.filename)
def _acquire(self, mode):
self.held = True
self.mode = mode
def _release(self):
self.held = False
self.mode = None
def __repr__(self):
if self.held:
mode = MODE_NAMES[self.mode]
else:
mode = 'not held'
return f'<{type(self).__name__}: {self.filename} ({mode})>'
class _FileLock:
"""
Helper class that locks a file with a mode (exclusive or shared). Should be
used through a FileLock instance.
"""
def __init__(self, file_lock, mode):
self.file_lock = file_lock
self.mode = mode
def acquire(self, blocking=True):
"""Acquire this lock."""
if self.file_lock.held:
raise RuntimeError(
f"Lock is already held as {MODE_NAMES[self.file_lock.mode]}")
flags = self.mode | (0 if blocking else fcntl.LOCK_NB)
fcntl.flock(self.file_lock.handle, flags)
self.file_lock._acquire(self.mode)
def release(self):
"""Release this lock."""
if not self.file_lock.held:
raise RuntimeError("Lock is not held")
elif self.file_lock.mode != self.mode:
raise RuntimeError("Lock is held as {held} not {desired}".format(
held=MODE_NAMES[self.file_lock.mode],
desired=MODE_NAMES[self.mode]))
fcntl.flock(self.file_lock.handle, fcntl.LOCK_UN)
self.file_lock._release()
@contextmanager
def __call__(self, blocking=True):
self.acquire(blocking=blocking)
try:
yield
finally:
self.release()
import datetime
import logging
import logging.config
import random
import time
from concurrent.futures import ProcessPoolExecutor
from functools import wraps
from multiprocessing import cpu_count
from os import makedirs
from pathlib import Path
from shutil import rmtree
from filelock import FileLock
logger = logging.getLogger('locks')
OUTPUT = Path('output')
LOCKS = OUTPUT / '.locks'
def log_exceptions(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception:
logger.exception("Whoops")
raise
return wrapper
@log_exceptions
def do_the_thing(band, timesteps):
logger.debug(f"Processing band {band}")
for timestep in timesteps:
# Preprocess the things
time.sleep(random.random() * 3)
# Get the locks
filename = f'output_{timestep}.grb'
lock = FileLock(LOCKS / f'{filename}.lock')
logger.debug(f"{band}, {timestep}: Aquiring lock {lock!r}...")
with lock.exclusive():
logger.debug(f"{band}, {timestep}: Got lock {lock!r}!")
# Open the output file for writing
with open(OUTPUT / filename, 'a') as output:
# Write all the things
output.write(f'{band}: Writing...\n')
output.flush()
time.sleep(random.random() * 2)
output.write(f'{band}: Done!\n')
output.flush()
logger.debug(f"{band}, {timestep}: Releasing lock {lock!r}...")
def main():
logging.config.dictConfig({
'version': 1,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'level': 'DEBUG',
'formatter': 'multiprocess',
},
},
'formatters': {
'multiprocess': {
'class': 'logging.Formatter',
'format': '%(asctime)s %(processName)-7s %(message)s',
},
},
'loggers': {
'locks': {
'level': 'DEBUG',
'handlers': ['console'],
},
},
})
bands = [band.rjust(5) for band in [
'foo', 'bar', 'baz', 'quux',
'earth', 'fire', 'wind', 'water', 'heart'
]]
random.shuffle(bands)
today = datetime.date.today()
timesteps = [
(today + datetime.timedelta(days=n)).isoformat()
for n in range(3)]
makedirs(OUTPUT, exist_ok=True)
makedirs(LOCKS, exist_ok=True)
with ProcessPoolExecutor(max_workers=cpu_count()) as pool:
for band in bands:
pool.submit(do_the_thing, band, timesteps)
logger.debug("Submitted all the jobs")
pool.shutdown()
logger.debug("Done!")
rmtree(LOCKS)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment