Skip to content

Instantly share code, notes, and snippets.

@lbolla
Created January 17, 2014 15:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save lbolla/8475101 to your computer and use it in GitHub Desktop.
Save lbolla/8475101 to your computer and use it in GitHub Desktop.
Class to manage long running processes, restarting them when necessary
import concurrent.futures
class ProcessManager(object):
def __init__(self):
self.pool = concurrent.futures.ProcessPoolExecutor()
self.futures = {}
def submit(self, f, *args, **kwargs):
future = self.pool.submit(f, *args, **kwargs)
self.futures[future] = (f, args, kwargs)
def start(self):
while self.futures:
res = concurrent.futures.wait(
self.futures,
return_when=concurrent.futures.FIRST_EXCEPTION)
print 'Tasks', len(res.done), len(res.not_done)
for future in res.done:
f, args, kwargs = self.futures[future]
del self.futures[future]
exc = future.exception()
if exc is None:
self.on_success(future, exc, f, *args, **kwargs)
else:
self.on_error(future, exc, f, *args, **kwargs)
print 'Resubmitting'
self.submit(f, *args, **kwargs)
# Overwrite in subclasses
def on_error(self, future, exc, f, *args, **kwargs):
print 'Got exception from', future, exc
def on_success(self, future, exc, f, *args, **kwargs):
print 'Future finished', future, future.result()
if __name__ == '__main__':
import random
import time
def f(i):
print 'start'
while True:
time.sleep(0.1)
x = random.random()
if x < 0.1:
raise Exception('DEAD')
elif x < 0.9:
continue
else:
return 'hello world', i
pm = ProcessManager()
for i in xrange(5):
pm.submit(f, i)
pm.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment