Skip to content

Instantly share code, notes, and snippets.

@devdave
Created November 1, 2011 18:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devdave/1331438 to your computer and use it in GitHub Desktop.
Save devdave/1331438 to your computer and use it in GitHub Desktop.
Deferred events
from collections import namedtuple
from twisted.internet.defer import Deferred
UserEvent = namedtuple("UserEvent", "subject, args, kwargs")
class UserEventStack(object):
def __init__(self, user):
self.events = []
self.user = user
self.d = None
def __len__(self):
return len(self.events)
def __iter__(self):
return self.events.__iter__()
def onEvent(self, subject, *args, **kwargs):
"""
Entry point for producers
"""
self.tell(subject, *args, **kwargs)
if self.d and self.d.called != True:
self.d.callback(self)
def tell(self, subject, *args, **kwargs):
"""
Shortcut for direct event injection
"""
self.events.append(UserEvent(subject, args, kwargs))
def getAll(self):
"""
Not exactly perfect but at point of call grabs all "events"
and empties the event stack
"""
events, self.events = self.events[:], []
return events
def check(self):
"""
Call point for consumers
"""
if self.d is None:
self.d = Deferred()
elif self.d.called == True:
self.d = Deferred()
if self.events:
self.d.callback(self)
return self.d
def reset(self):
self.d.cancel()
self.d = Deferred()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment