Skip to content

Instantly share code, notes, and snippets.

@Koed00
Last active February 4, 2023 21:51
Show Gist options
  • Save Koed00/3f6de495ea6af455e95e to your computer and use it in GitHub Desktop.
Save Koed00/3f6de495ea6af455e95e to your computer and use it in GitHub Desktop.
Multi horse worker stable test
from multiprocessing import Process, Queue, current_process
from random import randint
from time import sleep
import sys
def something():
x = randint(1, 20)
if x == 9:
# cause some trouble
raise InterruptedError
sleep(x)
return True
class Worker(object):
def __init__(self):
# TODO stable size should be settable
self.stable_size = 4
self.stable = []
self.task_queue = Queue()
self.done_queue = Queue()
for i in range(self.stable_size):
self._spawn_horse()
def _spawn_horse(self):
# This is just for PyCharm to not crash. Ignore it.
if not hasattr(sys.stdin, 'close'):
def dummy_close():
pass
sys.stdin.close = dummy_close
p = Process(target=self.horse, args=(self.task_queue, self.done_queue))
self.stable.append(p)
p.start()
@staticmethod
def horse(queue_in, queue_out):
name = current_process().name
print(name, 'Ready for work at {}'.format(current_process().pid))
for func in iter(queue_in.get, 'STOP'):
print(name, 'Starting Job {}'.format(func))
result = something()
queue_out.put(result)
print(name, 'Exiting Job {}'.format(func))
print(name, 'Stopped')
def work(self, job):
# Everyone still alive?
self.stable_boy()
# Put the job in the queue
self.task_queue.put(job)
def stable_boy(self):
# Check if all the horses are alive
for p in list(self.stable):
if not p.is_alive():
# Be humane
p.terminate()
self.stable.remove(p)
# Replace it with a fresh one
self._spawn_horse()
def exit(self):
# Send the STOP signal to the stable
for i in range(self.stable_size):
self.task_queue.put('STOP')
# Optional: Delete everything in the queue and then add STOP
# Wait for all the workers to finish the queue
for p in self.stable:
p.join()
print('All horses done.')
def test_worker(self):
for i in range(20):
print('Queuing Job {}'.format(i))
self.work(i)
@selwin
Copy link

selwin commented Jun 18, 2015

Thanks for this! In RQ's built in worker, the parent process is responsible for fetching job from the queue before feeding them to the work horse. Looking at your code above, it seems like each horse will be responsible for doing the fetching and storing result into Redis, is that correct?

If we were to move into a concurrent worker model, I think this is also the right move. If we turn the parent process into a supervisor that periodically checks whether it's horses are alive, RQ will also be more reliable since it can detect horses that die because of unexpected errors (there's also an issue for this somewhere).

@Koed00
Copy link
Author

Koed00 commented Jul 6, 2015

I have kept working on this the last two weeks and added recycling and timeout capability.
You can have a look at it here https://github.com/Koed00/django-q in cluster.py. Feedback is much appreciated.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment