Skip to content

Instantly share code, notes, and snippets.

@ls0f
Last active October 18, 2015 03:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ls0f/681a3faab241ea22c3ab to your computer and use it in GitHub Desktop.
Save ls0f/681a3faab241ea22c3ab to your computer and use it in GitHub Desktop.
Prevent the script repeatedly run
import os
import inspect
import atexit
from functools import wraps
class OneProcess(object):
def __init__(self):
self.caller_file_name = inspect.getouterframes(
inspect.currentframe())[1][1]
def check_process_exit(self, fn):
if os.path.exists(fn) and \
os.popen("ps up `cat {}` > /dev/null \
&& echo 1 || echo 2".format(fn)).read().strip() == '1':
return True
else:
with open(fn, 'wb') as f:
f.write("%s" % os.getpid())
def just_one_process(self, fn=None):
if fn is None:
fn = "{}.run".format(self.caller_file_name)
if self.check_process_exit(fn) is True:
print "{} is runing".format(self.caller_file_name)
exit(0)
else:
def exit_handler():
os.remove(fn)
atexit.register(exit_handler)
def just_one_func(self, fn=None, *args, **kwargs):
local_fn = fn
def wrap_1(func):
@wraps(func)
def wrap_2(*args2, **kwargs2):
if local_fn is None:
fn = "{}.{}.run".format(self.caller_file_name, func.__name__)
else:
fn = local_fn
if self.check_process_exit(fn):
print "{} {} is running".format(self.caller_file_name, func.__name__)
return
else:
try:
return func(*args2, **kwargs2)
finally:
os.remove(fn)
return wrap_2
return wrap_1
from one_process import OneProcess
import time
op = OneProcess()
@op.just_one_func(fn=None)
def test(i):
time.sleep(i)
def main():
time.sleep(10)
if __name__ == "__main__":
op.just_one_process()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment