Skip to content

Instantly share code, notes, and snippets.

@atl
Created November 29, 2009 09:46
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save atl/244866 to your computer and use it in GitHub Desktop.
Save atl/244866 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
"""worker.py: An implementation of an event-based asynchronous pattern.
Uses generators to allow asynchronous tasks to be coded in-line. Allows the
implementer to define what technique is used to execute asynchronously and
which generator iterations are executed asynchronously.
References:
http://code.activestate.com/recipes/576952/ [inline asynchronous code]
"""
# Snipped from:
# http://code.activestate.com/recipes/576965/
# simply so I won't forget it. Also referenced from:
# http://www.reddit.com/r/Python/comments/a8bjw
#
# By Glenn Eychaner http://code.activestate.com/recipes/users/4172294/
import os
import sys
import threading
import traceback
try:
from functools import wraps
except ImportError:
def wraps(wrapped):
"""A naive implementation of the wraps() decorator for Python <2.5"""
def wrap(wrapper):
wrapper.__name__ = wrapped.__name__
wrapper.__doc__ = wrapped.__doc__
wrapper.__module__ = wrapped.__module__
wrapper.__dict__.update(wrapped.__dict__)
return wrapper
return wrap
if __name__ == '__main__':
import optparse
import Queue
import time
__version__ = '$Revision: 0 $'.split()[1]
__usage__ = 'usage: %prog [options]'
def async(worker_factory):
"""Decorator which wraps a generator in an executor object that can
iterate the generator asynchronously.
The decorator takes a single argument; a factory (callable) which takes
a generator as its argument and returns an asynchronous executor object.
(The executor object is not required to be a subclass of AsyncExecutor.)
The decorator passes the generator to the factory to create the executor,
and then returns the executor to the caller. The caller can then start
the executor to begin iterating the generator.
"""
def async_factory(generator):
@wraps(generator)
def factory(*arglist, **keywmap):
work_iter = generator(*arglist, **keywmap)
worker = worker_factory(work_iter)
return worker
return factory
return async_factory
class AsyncExecutor:
"""A skeletal base class for asynchronous executors.
Does not implement any asynchronous behavior on its own; it is expected
that the subclass will define methods which call the next() or execute()
method asynchronously.
"""
def __init__(self, generator):
self._generator = generator
def __iter__(self): return self
def next(self): return self._generator.next()
def execute(self):
"""Convenience wrapper around next() which traps for exceptions, to
avoid lots of try...except in subclass code.
Returns a 2-element tuple containing the yielded value and True, or
the exception and False if the generator raised an exception. Also
calls handle_exception() for an exception other than StopIteration.
"""
try:
return (self.next(), True)
except StopIteration, si:
return (si, False)
except Exception, e:
self.handle_exception(e)
return (e, False)
def handle_exception(self, exception):
"""Handle an exception raised by the generator. By default, just
prints a traceback to stderr."""
traceback.print_exc(file=sys.stderr)
class ThreadExecutor(AsyncExecutor):
"""An executor which runs alternate iterations of a generator in a
separate thread and in an event queue. An iteration can yield False or
None to signal the end of the generator to the executor."""
def __init__(self, *zargs, **zparams):
AsyncExecutor.__init__(self, *zargs, **zparams)
def run(self):
"""In a thread, call execute() and pass the result to finish()."""
threading.Thread(target=lambda:self.finish(self.execute())).start()
def finish(self, run_yielded):
"""Using a callable passed from run(), queue into an event queue a
lambda which calls execute() and, if successful, calls run()"""
if callable(run_yielded[0]) and run_yielded[1]: run_yielded[0](lambda:
reduce(lambda a,b: a and b, self.execute()) and self.run())
if __name__ == '__main__':
class TestEventQueue(Queue.Queue):
"""An event queue which executes callable entries, looping as long
as there are more entries or active threads."""
def loop(self, looptime=0.5):
while threading.activeCount() > 1 or not self.empty():
print "Event loop %s (%d threads)" % (
threading.currentThread().getName(), threading.activeCount())
try:
next = self.get(timeout=looptime)
if callable(next): next()
except Queue.Empty: pass
@async(ThreadExecutor)
def work(event_queue, threads=5, worktime=2.0):
count = 1
while (count <= threads):
# Work performed in separate thread
print "Work %d started in %s" % (count,
threading.currentThread().getName())
time.sleep(worktime)
print "Work %d ended in %s" % (count,
threading.currentThread().getName())
yield event_queue.put
# Work performed in event queue
print "Work %d finished in %s" % (count,
threading.currentThread().getName())
count += 1
yield count <= threads
optparser = optparse.OptionParser(usage=__usage__, version=__version__)
optparser.disable_interspersed_args()
optparser.add_option('--threads', type='int', metavar='N', default=5,
help='Number of threads to spawn')
optparser.add_option('--looptime', type='float', metavar='N', default=0.5,
help='Timeout for event loop')
optparser.add_option('--worktime', type='float', metavar='N', default=2.0,
help='Time for worker thread to execute')
(options, args) = optparser.parse_args()
# Return options as dictionary.
optdict = lambda *args: dict([(k, getattr(options, k)) for k in args])
# Create and queue the first worker, and then loop
q = TestEventQueue()
q.put(work(q, **optdict('threads', 'worktime')).run)
q.loop(**optdict('looptime'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment