Skip to content

Instantly share code, notes, and snippets.

@Koed00
Last active February 4, 2023 21:51
Show Gist options
  • Save Koed00/3f6de495ea6af455e95e to your computer and use it in GitHub Desktop.
Save Koed00/3f6de495ea6af455e95e to your computer and use it in GitHub Desktop.
Multi horse worker stable test
from multiprocessing import Process, Queue, current_process
from random import randint
from time import sleep
import sys
def something():
x = randint(1, 20)
if x == 9:
# cause some trouble
raise InterruptedError
sleep(x)
return True
class Worker(object):
def __init__(self):
# TODO stable size should be settable
self.stable_size = 4
self.stable = []
self.task_queue = Queue()
self.done_queue = Queue()
for i in range(self.stable_size):
self._spawn_horse()
def _spawn_horse(self):
# This is just for PyCharm to not crash. Ignore it.
if not hasattr(sys.stdin, 'close'):
def dummy_close():
pass
sys.stdin.close = dummy_close
p = Process(target=self.horse, args=(self.task_queue, self.done_queue))
self.stable.append(p)
p.start()
@staticmethod
def horse(queue_in, queue_out):
name = current_process().name
print(name, 'Ready for work at {}'.format(current_process().pid))
for func in iter(queue_in.get, 'STOP'):
print(name, 'Starting Job {}'.format(func))
result = something()
queue_out.put(result)
print(name, 'Exiting Job {}'.format(func))
print(name, 'Stopped')
def work(self, job):
# Everyone still alive?
self.stable_boy()
# Put the job in the queue
self.task_queue.put(job)
def stable_boy(self):
# Check if all the horses are alive
for p in list(self.stable):
if not p.is_alive():
# Be humane
p.terminate()
self.stable.remove(p)
# Replace it with a fresh one
self._spawn_horse()
def exit(self):
# Send the STOP signal to the stable
for i in range(self.stable_size):
self.task_queue.put('STOP')
# Optional: Delete everything in the queue and then add STOP
# Wait for all the workers to finish the queue
for p in self.stable:
p.join()
print('All horses done.')
def test_worker(self):
for i in range(20):
print('Queuing Job {}'.format(i))
self.work(i)
@Koed00
Copy link
Author

Koed00 commented Jul 6, 2015

I have kept working on this the last two weeks and added recycling and timeout capability.
You can have a look at it here https://github.com/Koed00/django-q in cluster.py. Feedback is much appreciated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment