Created
August 21, 2011 06:18
-
-
Save heckj/1160238 to your computer and use it in GitHub Desktop.
Eventlet based actor class (from eventlet/coros.py)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import collections | |
import warnings | |
import eventlet | |
from eventlet import event | |
from eventlet import greenpool | |
from eventlet import greenthread | |
class Actor(object): | |
""" A free-running coroutine that accepts and processes messages. | |
Kind of the equivalent of an Erlang process, really. It processes | |
a queue of messages in the order that they were sent. You must | |
subclass this and implement your own version of :meth:`received`. | |
The actor's reference count will never drop to zero while the | |
coroutine exists; if you lose all references to the actor object | |
it will never be freed. | |
""" | |
def __init__(self, concurrency = 1): | |
""" Constructs an Actor, kicking off a new coroutine to process the messages. | |
The concurrency argument specifies how many messages the actor will try | |
to process concurrently. If it is 1, the actor will process messages | |
serially. | |
""" | |
warnings.warn("We're phasing out the Actor class, so as to get rid of" | |
"the coros module. If you use Actor, please speak up on " | |
"eventletdev@lists.secondlife.com, and we'll come up with a " | |
"transition plan. If no one speaks up, we'll remove Actor " | |
"in a future release of Eventlet.", | |
DeprecationWarning, stacklevel=2) | |
self._mailbox = collections.deque() | |
self._event = event.Event() | |
self._killer = eventlet.spawn(self.run_forever) | |
self._pool = greenpool.GreenPool(concurrency) | |
def run_forever(self): | |
""" Loops forever, continually checking the mailbox. """ | |
while True: | |
if not self._mailbox: | |
self._event.wait() | |
self._event = _event.Event() | |
else: | |
# leave the message in the mailbox until after it's | |
# been processed so the event doesn't get triggered | |
# while in the received method | |
self._pool.spawn_n( | |
self.received, self._mailbox[0]) | |
self._mailbox.popleft() | |
def cast(self, message): | |
""" Send a message to the actor. | |
If the actor is busy, the message will be enqueued for later | |
consumption. There is no return value. | |
>>> a = Actor() | |
>>> a.received = lambda msg: msg | |
>>> a.cast("hello") | |
""" | |
self._mailbox.append(message) | |
# if this is the only message, the coro could be waiting | |
if len(self._mailbox) == 1: | |
self._event.send() | |
def received(self, message): | |
""" Called to process each incoming message. | |
The default implementation just raises an exception, so | |
replace it with something useful! | |
>>> class Greeter(Actor): | |
... def received(self, (message, evt) ): | |
... print "received", message | |
... if evt: evt.send() | |
... | |
>>> a = Greeter() | |
This example uses Events to synchronize between the actor and the main | |
coroutine in a predictable manner, but this kinda defeats the point of | |
the :class:`Actor`, so don't do it in a real application. | |
>>> from eventlet.event import Event | |
>>> evt = Event() | |
>>> a.cast( ("message 1", evt) ) | |
>>> evt.wait() # force it to run at this exact moment | |
received message 1 | |
>>> evt.reset() | |
>>> a.cast( ("message 2", None) ) | |
>>> a.cast( ("message 3", evt) ) | |
>>> evt.wait() | |
received message 2 | |
received message 3 | |
>>> eventlet.kill(a._killer) # test cleanup | |
""" | |
raise NotImplementedError() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment