Skip to content

Instantly share code, notes, and snippets.

@adiroiban
Last active November 21, 2015 12:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save adiroiban/a27689106f841b7bb508 to your computer and use it in GitHub Desktop.
Save adiroiban/a27689106f841b7bb508 to your computer and use it in GitHub Desktop.
LoopingCall changes
class ChevahLoopingCall(object):
"""
Wrapper for looping call to keep a reference to current loop deferred.
"""
def __init__(self, callback):
self._context = LoopingCall(self._loopedCall)
self._callback = callback
self._ongoingDeferred = None
def start(self, inteval):
return self._context.start(interval)
def stop(self):
return self._context.stop()
def _loopedCall(self):
deferred = callback()
return deferred
class SnapshotLoop(object):
"""
A generic loop which polls for the state of a path using the `driver` and
trigger the changes using the event `handler`.
`driver` is the ILocation.getFolderMemberAttributes
`handler` is an EventHandler
"""
def __init__(
self, driver, handler, path, recursive, interval, on_error,
scheduler=reactor,
):
self._snapshot = {}
self._driver = driver
self._handler = handler
self._path = path
self._recursive = recursive
self._interval = interval
self._loop = ChevahLoopingCall(self._onSnapshot)
self._loop.clock = scheduler
self._onError = on_error
def start(self):
"""
Start watching the path.
"""
self._loop.start(self._interval)
# ChevahLoopingCall is patched to not delete deferred on errors
# so that we can still attach errbacks to it.
self._loop.deferred.addErrback(self._ebLoopingCall)
def stop(self):
"""
Stop watching the path.
"""
if self._loop.running:
self._loop.stop()
@defer.inlineCallbacks
def _onSnapshot(self):
"""
Called when a new snapshot should be made.
"""
member = yield self._driver(self._path)
changes = self._getChanges(members)
for change in changes:
self._handler.handle(change)
class SnapshotTest(TestCase):
"""
Tests for the snapshot which does not touch the network.
"""
clock = task.Clock()
def test_error_handler_is_used_on_errors(self):
"""
Here LoopingCall.__call__ was updated to keep the deferred after
calling errback so that we can still catch the error on start.
"""
path = mk.text()
errors = []
changes, handler = self.getHandler()
dir_members, driver = self.getDriver()
sut = SnapshotLoop(
driver=driver,
handler=handler,
recursive=false,
interval=10,
on_error=lambda failure: errors.append(failure),
scheduler=self.clock,
)
sut.start()
self.assertEqual([NoSuchPath(path)], errors)
class FunctionalSnapshotTest(MyNetworkReactorTestCase):
"""
Functional test which will use a Clock for scheduling but will use a
driver which touches the network
"""
@defer.inlineCallbacks
def test_some_kind_of_changes(self):
"""
Test that when a directory has some changes it will notify those
changes via the handler.
"""
remoteDirectory = self.getRemoteDirectory()
path = '/'
errors = []
changes, handler = self.getHandler()
dir_members, driver = self.getNetworkDriver()
sut = SnapshotLoop(
driver=driver,
handler=handler,
recursive=false,
interval=10,
on_error=lambda failure: errors.append(failure),
scheduler=self.clock,
)
sut.start()
# Wait for first network snapshot to be taken.
yield sut._loop._ongoingDeferred
self.assertIsEmpty(changes)
self.assertIsEmpty(errors)
yield remoteDirectory.createFile('/some-name', 'some-content')
# Check changes on next loop.
self.clock.advance(10)
# Wait for second snapshot.
yield sut._loop._ongoingDeferred
self.assertEqual([CreateEvent('/some-name')], changes)
self.assertIsEmpty(errors)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment