Skip to content

Instantly share code, notes, and snippets.

@jrydberg
Created April 18, 2012 08:50
Show Gist options
  • Save jrydberg/2412129 to your computer and use it in GitHub Desktop.
Save jrydberg/2412129 to your computer and use it in GitHub Desktop.
clock abstraction for gevent
# Copyright 2012 Johan Rydberg.
# Copyright 2001-2008 Twisted Matrix Laboratories.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Simple abstraction of a clock that allows callers to read out the
current time, but also schedule future calls. Scheduled calls can be
cancelled and resetted.
The MockClock and DelayedCall classes and their functionality is based
on the twisted.internet.task.Clock class from Twisted which is
released under a MIT license.
"""
from time import time as _time
from gevent.event import Event
from gevent import spawn_later, kill as kill_greenlet, sleep as gevent_sleep
import gevent
class AlreadyCancelled(Exception):
pass
class AlreadyCalled(Exception):
pass
class DelayedCall(object):
"""Representation of a call in the future."""
def __init__(self, clock, time, func, args, kw, cancel, reset):
self.clock = clock
self.time, self.func, self.args, self.kw = time, func, args, kw
self.resetter = reset
self.canceller = cancel
self.cancelled = self.called = 0
def cancel(self):
"""Unschedule this call."""
if self.cancelled:
raise AlreadyCancelled()
elif self.called:
raise AlreadyCalled()
else:
self.canceller(self)
self.cancelled = 1
if self.debug:
self._str = str(self)
del self.func, self.args, self.kw
def reset(self, seconds_from_now):
"""Reschedule this call for a different time."""
if self.cancelled:
raise AlreadyCancelled()
elif self.called:
raise AlreadyCalled()
else:
new_time = self.clock.time() + seconds_from_now
self.time = new_time
self.resetter(self)
def __le__(self, other):
return self.time <= other.time
class Clock(object):
"""Abstraction of a clock that has functionality for reporting
current time and scheduler functions to be called in the future.
"""
def __init__(self):
self._greenlets = {}
def _call(self, call):
call.called = 1
del self._greenlets[call]
call.func(*call.args, **call.kw)
def _canceller(self, call):
"""Cancel delayed call."""
greenlet = self._greenlets.pop(call, None)
if greenlet is not None:
kill_greenlet(greenlet)
def _resetter(self, call):
"""Schedule or reschedule delayed call."""
self._canceller(call)
self._greenlets[call] = spawn_later(
call.time - self.time(), self._call, call)
def sleep(self, seconds=0):
"""Sleep for C{seconds}."""
gevent_sleep(seconds)
advance = sleep
def call_later(self, seconds, fn, *args, **kw):
"""Call function C{fn} at a later time."""
dc = DelayedCall(self, self.time() + seconds,
fn, args, kw, self._canceller, self._resetter)
self._resetter(dc)
return dc
def time(self):
"""Return current time."""
return _time()
class MockClock(object):
"""Provide a deterministic, easily-controlled version of
L{Clock}.
This is commonly useful for writing deterministic unit tests for
code which schedules events using this API.
"""
right_now = 0.0
def __init__(self):
self.calls = []
def time(self):
"""Pretend to be time.time()."""
return self.right_now
def call_later(self, seconds, fn, *a, **kw):
dc = DelayedCall(self, self.time() + seconds,
fn, a, kw, self.calls.remove, lambda c: None)
self.calls.append(dc)
self.calls.sort(lambda a, b: cmp(a.time, b.time))
return dc
def sleep(self, amount=0):
"""Sleep current greenlet for the specified amount."""
ev = Event()
self.call_later(amount, ev.set)
ev.wait()
def advance(self, amount=0):
"""Move time on this clock forward by the given amount and run
whatever pending calls should be run.
"""
# First we yield the control so that other greenlets have a
# chance to run.
gevent_sleep()
future = self.right_now + amount
while self.calls and self.calls[0].time <= future:
call = self.calls.pop(0)
self.right_now = call.time
call.called = 1
call.func(*call.args, **call.kw)
gevent_sleep()
self.right_now = future
def patch_gevent(c=None, time=False):
"""Patch gevent itself so that gevent.sleep and gevent.spawn_later
uses a clock.
@param time: If true, also patch C{time.time} to use the clock.
"""
if c is None:
c = Clock()
_gevent = __import__('gevent')
_gevent.sleep = c.sleep
# FIXME: we need to wrap this in something that looks like a
# greenlet so that it can be killed with gevent.kill.
_gevent.spawn_later = c.call_later
if time:
_time = __import__('time')
_time.time = c.time
if __name__ == '__main__':
def test(c):
def fn(*args, **kw):
print c.time(), "fn", args, kw
dc = c.call_later(2, fn, 1, 2, a="a", b="b")
print "start to sleep"
c.sleep(2)
print "back again"
def test2():
import gevent, time
def fn(*args, **kw):
print time.time(), "fn", args, kw
print "start to sleep"
gevent.spawn_later(2, fn, 1, 2, a="a", b="b")
gevent.sleep(2)
print "back again"
c = MockClock()
gevent.spawn(test, c)
c.advance(4)
patch_gevent(c, time=True)
gevent.spawn(test2)
c.advance(4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment