Skip to content

Instantly share code, notes, and snippets.

@shylent
Created October 19, 2010 08:35
Show Gist options
  • Save shylent/633843 to your computer and use it in GitHub Desktop.
Save shylent/633843 to your computer and use it in GitHub Desktop.
'''
@author: shylent
'''
from twisted.internet import reactor
from twisted.python import log
class Spent(Exception):
pass
def no_op(*args, **kwargs):
pass
class SequentialTimeout(object):
"""Calls a given callable at intervals, specified by the L{timeout} iterable.
Optionally calls a timeout handler, if provided, when there are no more timeout values.
@param timeout: an iterable, that yields valid _seconds arguments to
L{callLater<twisted.internet.interfaces.IReactorTime.callLater> (floats)
@type timeout: any iterable
@param callable: the callable, that will be called at the specified intervals
@param callable_args: the arguments to call it with
@param callable_kwargs: the keyword arguments to call it with
@param on_timeout: the callable, that will be called when there are no more
timeout values
@param on_timeout_args: the arguments to call it with
@param on_timeout_kwargs: the keyword arguments to call it with
"""
@classmethod
def run(cls, timeout, callable, callable_args=None, callable_kwargs=None,
on_timeout=None, on_timeout_args=None, on_timeout_kwargs=None):
"""Create a L{SequentialTimeout} object and start its scheduler cycle
@see: L{SequentialTimeout}
"""
inst = cls(timeout, callable, callable_args, callable_kwargs,
on_timeout, on_timeout_args, on_timeout_kwargs)
inst.reschedule()
return inst
def __init__(self, timeout,
callable, callable_args=None, callable_kwargs=None,
on_timeout=None, on_timeout_args=None, on_timeout_kwargs=None):
self._timeout = iter(timeout)
self.callable = callable
self.callable_args = callable_args or []
self.callable_kwargs = callable_kwargs or {}
self.on_timeout = on_timeout or no_op
self.on_timeout_args = on_timeout_args or []
self.on_timeout_kwargs = on_timeout_kwargs or {}
self._wd = None
self._spent = False
def _call_and_schedule(self):
self.callable(*self.callable_args, **self.callable_kwargs)
self.reschedule()
def reschedule(self):
"""Schedule the next L{callable} call
@raise Spent: if the timeout iterator has been exhausted and on_timeout
handler has been already called
"""
if self._spent:
raise Spent()
try:
next_timeout = self._timeout.next()
log.msg("Next timeout: %s" % next_timeout)
self._wd = reactor.callLater(next_timeout, self._call_and_schedule)
except StopIteration:
self.on_timeout(*self.on_timeout_args, **self.on_timeout_kwargs)
self._spent = True
def cancel(self):
"""Cancel the next scheduled call"""
if self._wd is not None and self._wd.active():
self._wd.cancel()
self._spent = True
def main():
import sys
def quack():
log.msg("Quack")
def to():
log.msg("Timed out")
reactor.stop()
log.startLogging(sys.stdout)
SequentialTimeout.run((0, 1, 3, 6), quack, on_timeout=to)
if __name__ == '__main__':
reactor.callWhenRunning(main)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment