Last active
November 21, 2015 12:28
-
-
Save adiroiban/a27689106f841b7bb508 to your computer and use it in GitHub Desktop.
LoopingCall changes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ChevahLoopingCall(object): | |
""" | |
Wrapper for looping call to keep a reference to current loop deferred. | |
""" | |
def __init__(self, callback): | |
self._context = LoopingCall(self._loopedCall) | |
self._callback = callback | |
self._ongoingDeferred = None | |
def start(self, inteval): | |
return self._context.start(interval) | |
def stop(self): | |
return self._context.stop() | |
def _loopedCall(self): | |
deferred = callback() | |
return deferred | |
class SnapshotLoop(object): | |
""" | |
A generic loop which polls for the state of a path using the `driver` and | |
trigger the changes using the event `handler`. | |
`driver` is the ILocation.getFolderMemberAttributes | |
`handler` is an EventHandler | |
""" | |
def __init__( | |
self, driver, handler, path, recursive, interval, on_error, | |
scheduler=reactor, | |
): | |
self._snapshot = {} | |
self._driver = driver | |
self._handler = handler | |
self._path = path | |
self._recursive = recursive | |
self._interval = interval | |
self._loop = ChevahLoopingCall(self._onSnapshot) | |
self._loop.clock = scheduler | |
self._onError = on_error | |
def start(self): | |
""" | |
Start watching the path. | |
""" | |
self._loop.start(self._interval) | |
# ChevahLoopingCall is patched to not delete deferred on errors | |
# so that we can still attach errbacks to it. | |
self._loop.deferred.addErrback(self._ebLoopingCall) | |
def stop(self): | |
""" | |
Stop watching the path. | |
""" | |
if self._loop.running: | |
self._loop.stop() | |
@defer.inlineCallbacks | |
def _onSnapshot(self): | |
""" | |
Called when a new snapshot should be made. | |
""" | |
member = yield self._driver(self._path) | |
changes = self._getChanges(members) | |
for change in changes: | |
self._handler.handle(change) | |
class SnapshotTest(TestCase): | |
""" | |
Tests for the snapshot which does not touch the network. | |
""" | |
clock = task.Clock() | |
def test_error_handler_is_used_on_errors(self): | |
""" | |
Here LoopingCall.__call__ was updated to keep the deferred after | |
calling errback so that we can still catch the error on start. | |
""" | |
path = mk.text() | |
errors = [] | |
changes, handler = self.getHandler() | |
dir_members, driver = self.getDriver() | |
sut = SnapshotLoop( | |
driver=driver, | |
handler=handler, | |
recursive=false, | |
interval=10, | |
on_error=lambda failure: errors.append(failure), | |
scheduler=self.clock, | |
) | |
sut.start() | |
self.assertEqual([NoSuchPath(path)], errors) | |
class FunctionalSnapshotTest(MyNetworkReactorTestCase): | |
""" | |
Functional test which will use a Clock for scheduling but will use a | |
driver which touches the network | |
""" | |
@defer.inlineCallbacks | |
def test_some_kind_of_changes(self): | |
""" | |
Test that when a directory has some changes it will notify those | |
changes via the handler. | |
""" | |
remoteDirectory = self.getRemoteDirectory() | |
path = '/' | |
errors = [] | |
changes, handler = self.getHandler() | |
dir_members, driver = self.getNetworkDriver() | |
sut = SnapshotLoop( | |
driver=driver, | |
handler=handler, | |
recursive=false, | |
interval=10, | |
on_error=lambda failure: errors.append(failure), | |
scheduler=self.clock, | |
) | |
sut.start() | |
# Wait for first network snapshot to be taken. | |
yield sut._loop._ongoingDeferred | |
self.assertIsEmpty(changes) | |
self.assertIsEmpty(errors) | |
yield remoteDirectory.createFile('/some-name', 'some-content') | |
# Check changes on next loop. | |
self.clock.advance(10) | |
# Wait for second snapshot. | |
yield sut._loop._ongoingDeferred | |
self.assertEqual([CreateEvent('/some-name')], changes) | |
self.assertIsEmpty(errors) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment