Skip to content

Instantly share code, notes, and snippets.

@larytet
Created August 10, 2020 03:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save larytet/3ca9f9a32b1dc089a24cb7011455141f to your computer and use it in GitHub Desktop.
Save larytet/3ca9f9a32b1dc089a24cb7011455141f to your computer and use it in GitHub Desktop.
import psutil
import time
import os
import pickle
import signal
class Job():
def __init__(self, name, logger):
self.name, self.logger = name, logger
self.pid = 0
def is_alive(self):
if self.pid == 0:
return True
if not psutil.pid_exists(self.pid):
return False
try:
status = psutil.Process(self.pid).status()
# pylint: disable=W0702
except:
return True
is_running = status in [psutil.STATUS_RUNNING, psutil.STATUS_SLEEPING, psutil.STATUS_WAITING]
return is_running
def join(self, timeout):
if self.pid == 0:
return
time_start = time.time()
polling_time = min(0.05*timeout, 0.005)
while time.time()-time_start < timeout and self.is_alive():
time.sleep(polling_time)
continue
def call(self, timeout, func, *args, **kwargs):
pipe_read, pipe_write = os.pipe()
results = {
"name": self.name,
"pid": self.pid,
"error": False,
}
self.pid = os.fork()
is_parent = (self.pid != 0)
if not is_parent:
# If I get a BREAK call os.exit()
signal.signal(signal.SIGINT, signal_handler)
os.close(pipe_read)
func_results = func(*args, **kwargs)
fd_write = os.fdopen(pipe_write, 'wb')
pickle.dump(func_results, fd_write)
fd_write.close()
# pylint: disable=W0212
os._exit(0)
start_time = time.time()
if is_parent:
os.close(pipe_write)
self.join(timeout)
elapsed = time.time() - start_time
if elapsed > timeout:
results["error.deadline"] = True
results["error"] = True
is_alive = self.is_alive()
if is_alive:
results["error.is_alive"] = True
results["error"] = True
try:
# If timeout is zero let the forked process to set signal
# before it gets killed
time.sleep(0.01)
os.kill(self.pid, signal.SIGINT)
except Exception as e:
if is_alive:
results["error.failed_to_kill"] = True
results["error.exception"] = e
results["error"] = True
self.logger.error(f"{self.name}: failed to kill {self.pid}")
fd_read = os.fdopen(pipe_read, 'rb')
try:
func_results = pickle.load(fd_read)
results["func_results"] = func_results
except Exception as e:
results["error.pickle"] = True
results["error"] = True
fd_read.close()
return results
# pylint: disable=W0613
def signal_handler(sig, frame):
# pylint: disable=W0212
os._exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment