Skip to content

Instantly share code, notes, and snippets.

@nvie
Created August 30, 2012 21:22
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nvie/3541415 to your computer and use it in GitHub Desktop.
Save nvie/3541415 to your computer and use it in GitHub Desktop.
Initial experiment with a possible new worker structure for RQ
from gevent import monkey
monkey.patch_all()
import gevent.pool
import os
import random
import time
import datetime
from multiprocessing import Semaphore, Array
class BaseWorker(object):
def work(self):
while True:
self.spawn_child()
def spawn_child(self):
raise NotImplementedError('Implement this in a subclass.')
def fake_work(self):
sleep_time = 3 * random.random()
print datetime.datetime.now(), '- Hello from', os.getpid(), '- %.3fs' % sleep_time
time.sleep(sleep_time)
class ForkingWorker(BaseWorker):
def __init__(self, num_processes=1):
# Set up sync primitives, to communicate with the spawned children
self._semaphore = Semaphore(num_processes)
self._slots = Array('i', [0] * num_processes)
def spawn_child(self):
"""Forks and executes the job."""
self._semaphore.acquire() # responsible for the blocking
# Select an empty slot from self._slots (the first 0 value is picked)
# The implementation guarantees there will always be at least one empty slot
for slot, value in enumerate(self._slots):
if value == 0:
break
# The usual hardcore forking action
child_pid = os.fork()
if child_pid == 0:
random.seed()
# Within child
try:
self.fake_work()
finally:
# This is the new stuff. Remember, we're in the child process
# currently. When all work is done here, free up the current
# slot (by writing a 0 in the slot position). This
# communicates to the parent that the current child has died
# (so can safely be forgotten about).
self._slots[slot] = 0
self._semaphore.release()
os._exit(0)
else:
# Within parent, keep track of the new child by writing its PID
# into the first free slot index.
self._slots[slot] = child_pid
class GeventWorker(BaseWorker):
def __init__(self, num_processes=1):
self._pool = gevent.pool.Pool(num_processes)
def spawn_child(self):
"""Forks and executes the job."""
self._pool.spawn(self.fake_work)
if __name__ == '__main__':
#fw = ForkingWorker(4)
#fw.work()
gw = GeventWorker(4)
gw.work()
@nvie
Copy link
Author

nvie commented Aug 30, 2012

This change adds a _slots instance variable that is an multiprocessing.Array—a synchronisation primitive for sharing C-type arrays. This array is used to keep track of the child PIDs that are currently spawned.

(We need access to the currently running children to be able to kill them when requested. This is something for later, though.)

@nvie
Copy link
Author

nvie commented Aug 30, 2012

Now, the GeventWorker implementation is added. The implementation is kind of trivial now, since pool.spawn already has all the niceties we need.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment