Skip to content

Instantly share code, notes, and snippets.

@dyno
Created May 16, 2021 06:23
Show Gist options
  • Save dyno/8ed6f7483f80ec61c44127c7a661ae93 to your computer and use it in GitHub Desktop.
Save dyno/8ed6f7483f80ec61c44127c7a661ae93 to your computer and use it in GitHub Desktop.
make sure only one processing running on the system
import fcntl
import sys
from contextlib import contextmanager
from os.path import isfile
from absl import logging as log
@contextmanager
def single_process_lock(lockfile: str):
fp = open(lockfile, "w")
fp.flush()
try:
fcntl.lockf(fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError:
log.error("another instance is already running.")
sys.exit(1)
yield fp
fcntl.lockf(fp, fcntl.LOCK_UN)
if isfile(lockfile):
os.unlink(lockfile)
if __name__ == "__main__":
import tempfile
from os.path import join
with single_proess_lock(tempfile.gettempdir(), "myprocess.lock"):
sleep(60)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment