Skip to content

Instantly share code, notes, and snippets.

@heckj
Created August 21, 2011 06:18
Show Gist options
  • Save heckj/1160238 to your computer and use it in GitHub Desktop.
Save heckj/1160238 to your computer and use it in GitHub Desktop.
Eventlet based actor class (from eventlet/coros.py)
import collections
import warnings
import eventlet
from eventlet import event
from eventlet import greenpool
from eventlet import greenthread
class Actor(object):
""" A free-running coroutine that accepts and processes messages.
Kind of the equivalent of an Erlang process, really. It processes
a queue of messages in the order that they were sent. You must
subclass this and implement your own version of :meth:`received`.
The actor's reference count will never drop to zero while the
coroutine exists; if you lose all references to the actor object
it will never be freed.
"""
def __init__(self, concurrency = 1):
""" Constructs an Actor, kicking off a new coroutine to process the messages.
The concurrency argument specifies how many messages the actor will try
to process concurrently. If it is 1, the actor will process messages
serially.
"""
warnings.warn("We're phasing out the Actor class, so as to get rid of"
"the coros module. If you use Actor, please speak up on "
"eventletdev@lists.secondlife.com, and we'll come up with a "
"transition plan. If no one speaks up, we'll remove Actor "
"in a future release of Eventlet.",
DeprecationWarning, stacklevel=2)
self._mailbox = collections.deque()
self._event = event.Event()
self._killer = eventlet.spawn(self.run_forever)
self._pool = greenpool.GreenPool(concurrency)
def run_forever(self):
""" Loops forever, continually checking the mailbox. """
while True:
if not self._mailbox:
self._event.wait()
self._event = _event.Event()
else:
# leave the message in the mailbox until after it's
# been processed so the event doesn't get triggered
# while in the received method
self._pool.spawn_n(
self.received, self._mailbox[0])
self._mailbox.popleft()
def cast(self, message):
""" Send a message to the actor.
If the actor is busy, the message will be enqueued for later
consumption. There is no return value.
>>> a = Actor()
>>> a.received = lambda msg: msg
>>> a.cast("hello")
"""
self._mailbox.append(message)
# if this is the only message, the coro could be waiting
if len(self._mailbox) == 1:
self._event.send()
def received(self, message):
""" Called to process each incoming message.
The default implementation just raises an exception, so
replace it with something useful!
>>> class Greeter(Actor):
... def received(self, (message, evt) ):
... print "received", message
... if evt: evt.send()
...
>>> a = Greeter()
This example uses Events to synchronize between the actor and the main
coroutine in a predictable manner, but this kinda defeats the point of
the :class:`Actor`, so don't do it in a real application.
>>> from eventlet.event import Event
>>> evt = Event()
>>> a.cast( ("message 1", evt) )
>>> evt.wait() # force it to run at this exact moment
received message 1
>>> evt.reset()
>>> a.cast( ("message 2", None) )
>>> a.cast( ("message 3", evt) )
>>> evt.wait()
received message 2
received message 3
>>> eventlet.kill(a._killer) # test cleanup
"""
raise NotImplementedError()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment