Skip to content

Instantly share code, notes, and snippets.

@agrif
Created December 11, 2011 21:51
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save agrif/1462964 to your computer and use it in GitHub Desktop.
Save agrif/1462964 to your computer and use it in GitHub Desktop.
import multiprocessing
import itertools
import Queue
from signals import Signal
##
## first, some generic interfaces
##
class JobProvider(object):
"""An interface that can be implemented to provide jobs to a
JobManager implementation."""
def get_jobs(self):
"""Returns an iterator of job objects, that are used as
arguments to do_job."""
raise NotImplementedError("get_jobs")
def do_job(self, job):
"""Does whatever work is associated with the given job."""
raise NotImplementedError("do_job")
class JobManager(object):
"""A generic interface for distributing jobs to worker processes
(or elsewhere). It is also responsible for intercepting and
relaying signals back to the Python instance where do_all_jobs was
called."""
def do_all_jobs(self, job_providers):
"""Run all the jobs provided by the given list of
providers."""
raise NotImplementedError("do_all_jobs")
##
## now a specific JobManager that uses multiprocessing
##
# worker process class
class MultiprocessingJobManagerProcess(multiprocessing.Process):
def __init__(self, job_providers, job_queue, result_queue, signal_queue):
super(MultiprocessingJobManagerProcess, self).__init__()
self.providers = job_providers
self.job_queue = job_queue
self.result_queue = result_queue
self.signal_queue = signal_queue
def run(self):
# register for all available signals
def register_signal(name, sig):
def handler(*args, **kwargs):
self.signal_queue.put((name, args, kwargs), False)
sig.set_interceptor(handler)
for name, sig in Signal.signals.iteritems():
register_signal(name, sig)
# job loop
while 1:
try:
i, job = self.job_queue.get(False)
result = self.providers[i].do_job(job)
self.result_queue.put(result, False)
except Queue.Empty:
pass
class MultiprocessingJobManager(JobManager):
def __init__(self, pool_size=0):
if pool_size <= 0:
pool_size = multiprocessing.cpu_count()
self.pool_size = pool_size
def _handle_messages(self, result_queue, signal_queue, timeout=0.0):
# figure out how many jobs have finished
finished_jobs = 0
result_empty = False
signal_empty = False
while not (result_empty and signal_empty):
if not result_empty:
try:
if timeout > 0.0:
result = result_queue.get(True, timeout)
# timeout should only apply once
timeout = 0.0
else:
result = result_queue.get(False)
finished_jobs += 1
except Queue.Empty:
result_empty = True
if not signal_empty:
try:
name, args, kwargs = signal_queue.get(False)
sig = Signal.signals[name]
sig.emit_intercepted(*args, **kwargs)
except Queue.Empty:
signal_empty = True
return finished_jobs
def do_all_jobs(self, job_providers):
# make sure job_providers is a list
try:
job_providers = list(job_providers)
except TypeError:
job_providers = [job_providers]
# create the job queue, result queue and signal queue
job_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
signal_queue = multiprocessing.Queue()
# create the process pool
pool = []
for i in range(self.pool_size):
proc = MultiprocessingJobManagerProcess(job_providers, job_queue, result_queue, signal_queue)
pool.append(proc)
for p in pool:
p.start()
# distribute work, shuffling amongst available providers
outstanding_jobs = 0
provider_jobs = map(lambda p: p.get_jobs(), job_providers)
for joblist in itertools.izip_longest(*provider_jobs):
for i, job in enumerate(joblist):
if not job is None:
job_queue.put((i, job), False)
outstanding_jobs += 1
while outstanding_jobs > self.pool_size * 10:
outstanding_jobs -= self._handle_messages(result_queue, signal_queue, timeout=1)
# finish all jobs and terminate the processes
while outstanding_jobs > 0:
outstanding_jobs -= self._handle_messages(result_queue, signal_queue, timeout=1)
for p in pool:
p.terminate()
##
## a way to create named "signals" that, when emitted, call a set
## of registered functions
##
class Signal(object):
"""A mechanism for registering functions to be called whenever
some specified event happens. This object is designed to work with
JobManager so that functions can register to always run in the
main Python instance."""
# a global list of registered signals, indexed by name
# this is used by JobManagers to register and relay signals
signals = {}
def __init__(self, namespace, name):
"""Creates a signal. Namespace and name should be the name of
the class this signal is for, and the name of the signal. They
are used to create a globally-unique name."""
self.namespace = namespace
self.name = name
self.fullname = namespace + '.' + name
self.interceptor = None
self.local_functions = []
self.functions = []
# register this signal
self.signals[self.fullname] = self
def register(self, func):
"""Register a function to be called when this signal is
emitted. Functions registered in this way will always run in
the main Python instance."""
self.functions.append(func)
return func
def register_local(self, func):
"""Register a function to be called when this signal is
emitted. Functions registered in this way will always run in
the Python instance in which they were emitted."""
self.local_functions.append(func)
return func
def set_interceptor(self, func):
"""Sets an interceptor function. This function is called
instead of all the non-locally registered functions if it is
present, and should be used by JobManagers to intercept signal
emissions."""
self.interceptor = func
def emit(self, *args, **kwargs):
"""Emits the signal with the given arguments."""
for func in self.local_functions:
func(*args, **kwargs)
if self.interceptor:
self.interceptor(*args, **kwargs)
return
for func in self.functions:
func(*args, **kwargs)
def emit_intercepted(self, *args, **kwargs):
"""Re-emits an intercepted signal, and finishes the work that
would have been done during the original emission. This should
be used by JobManagers to re-emit signals intercepted in
worker Python instances."""
for func in self.functions:
func(*args, **kwargs)
# convenience
def __call__(self, *args, **kwargs):
self.emit(*args, **kwargs)
import hashlib
import string
import jobs
import signals
##
## a test job provider
##
def md5(s):
return hashlib.md5(s).hexdigest()
class HashChecker(jobs.JobProvider):
on_find = signals.Signal("HashChecker", "on_find")
def __init__(self, length, letters=string.ascii_lowercase, hasher=md5, target="000000"):
super(HashChecker, self).__init__()
self.length = length
self.letters = letters
self.num_letters = len(letters)
self.hasher = hasher
self.target = target
def get_jobs(self):
batch_size = 32 ** 4
space_size = self.num_letters ** self.length
batches = space_size // batch_size
i = 0
while i < batches:
yield (batch_size * i, min(batch_size * (i + 1), space_size))
i += 1
def do_job(self, min_max):
for i in xrange(*min_max):
source_letters = []
while i != 0:
source_letters.append(i % self.num_letters)
i = i // self.num_letters
while len(source_letters) < self.length:
source_letters.append(0)
source = ''.join(map(lambda i: self.letters[i], source_letters))
self.check_hash(source)
def check_hash(self, src):
hash = self.hasher(src)
if hash.startswith(self.target):
self.on_find(src, hash)
@HashChecker.on_find.register
def on_find_listener(src, hash):
print "found", repr(src), "hashes to", hash
if __name__ == "__main__":
manager = jobs.MultiprocessingJobManager(3)
provider1 = HashChecker(5, letters=string.ascii_letters + string.digits + ' ', target="deadbeaf")
manager.do_all_jobs([provider1])
@agrif
Copy link
Author

agrif commented Dec 13, 2011

Flow chart detailing how signals work: http://i.imgur.com/DoSs2.png

One other thing: the @signal decorator essentially turns a method into a Signal object. So,

class A:
    @signal("A")
    def do_stuff(self):
        return "stuff"

defines a class A with signal A.do_stuff, and the default handler returns the string "stuff". This is a syntax I liked, but the whole thing can be rewritten without default handlers or the decorator like so:

class A:
    on_test = Signal("A", "on_test")

If you go this way though, you lose one really neat side-effect of the decorator: calling self.on_test() automatically emits the "on_test" signal with self as the first argument.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment