Skip to content

Instantly share code, notes, and snippets.

@ralphm
Created April 6, 2015 22:26
Show Gist options
  • Save ralphm/1d13f922485452aef4c6 to your computer and use it in GitHub Desktop.
Save ralphm/1d13f922485452aef4c6 to your computer and use it in GitHub Desktop.
Cancel on timeout
from twisted.internet import defer, reactor
from twisted.python import log
from twisted.trial.unittest import TestCase
def foo(timeout):
"""
Actual code returning deferred than cancels its DelayedCalls.
"""
def canceller(d):
dc.cancel()
d = defer.Deferred(canceller)
dc = reactor.callLater(timeout, d.callback, None)
return d
def timeouter(result, d):
"""
Trap timeout to cancel given deferred.
"""
result.trap(defer.TimeoutError)
d.cancel()
return result
def chainIgnoreCancel(d, d2):
"""
Chain d to d2, ignoring d being cancelled.
"""
def cb(result):
d2.callback(result)
def eb(result):
if not result.check(defer.CancelledError):
d2.errback(result)
d.addCallbacks(cb, eb)
class Tests(TestCase):
timeout = 2
def tearDown(self):
log.msg("hi")
def test_timeout(self):
d = defer.Deferred()
dReal = foo(self.timeout + 1)
d.addErrback(timeouter, dReal)
chainIgnoreCancel(dReal, d)
return d
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment