Created
June 29, 2009 09:05
-
-
Save schuyler/137537 to your computer and use it in GitHub Desktop.
scheduler patch for RapidSMS
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/lib/rapidsms/router.py b/lib/rapidsms/router.py | |
index 0785499..ddecd0a 100644 | |
--- a/lib/rapidsms/router.py | |
+++ b/lib/rapidsms/router.py | |
@@ -1,7 +1,7 @@ | |
#!/usr/bin/env python | |
# vim: ai ts=4 sts=4 et sw=4 | |
-import time, datetime, os | |
+import time, datetime, os, heapq | |
import threading | |
import traceback | |
@@ -16,6 +16,7 @@ class Router (component.Receiver): | |
component.Receiver.__init__(self) | |
self.backends = [] | |
self.apps = [] | |
+ self.events = [] | |
self.running = False | |
self.logger = None | |
@@ -179,6 +180,39 @@ class Router (component.Receiver): | |
except Exception: | |
self.log_last_exception("The %s backend failed to stop" % backend.slug) | |
+ """ | |
+ The call_at() method schedules a callback with the router. It takes as its | |
+ first argument one of several possible objects: | |
+ | |
+ * if an int or float, the callback is called that many | |
+ seconds later. | |
+ * if a datetime.datetime object, the callback is called at | |
+ the time specified. | |
+ * if a datetime.timedelta object, the callback is called | |
+ at the timedelta from now. | |
+ | |
+ call_at() takes as its second argument the function to be called | |
+ at the scheduled time. All other positional and keyword arguments | |
+ are passed directly to the callback when it is called. | |
+ | |
+ If the callback returns a true value that is one of the time objects | |
+ understood by call_at(), the callback will be called again at the | |
+ specified time with the same arguments. | |
+ """ | |
+ def call_at (self, when, callback, *args, **kwargs): | |
+ if isinstance(when, datetime.timedelta): | |
+ when = datetime.datetime.now() + when | |
+ if isinstance(when, datetime.datetime): | |
+ when = time.mktime(when.timetuple()) | |
+ elif isinstance(when, (int, float)): | |
+ when = time.time() + when | |
+ else: | |
+ self.debug("Call to %s wasn't scheduled with a suitable time: %s", | |
+ callback.func_name, when) | |
+ return | |
+ self.debug("Scheduling call to %s at %s", | |
+ callback.func_name, datetime.datetime.fromtimestamp(when).ctime()) | |
+ heapq.heappush(self.events, (when, callback, args, kwargs)) | |
def start (self): | |
self.running = True | |
@@ -224,6 +258,7 @@ class Router (component.Receiver): | |
# wait until we're asked to stop | |
while self.running: | |
try: | |
+ self.call_scheduled() | |
self.run() | |
except KeyboardInterrupt: | |
break | |
@@ -240,6 +275,15 @@ class Router (component.Receiver): | |
msg = self.next_message(timeout=1.0) | |
if msg is not None: | |
self.incoming(msg) | |
+ | |
+ def call_scheduled(self): | |
+ while self.events and self.events[0][0] < time.time(): | |
+ when, callback, args, kwargs = heapq.heappop(self.events) | |
+ self.info("Calling %s(%s, %s)", | |
+ callback.func_name, args, kwargs) | |
+ result = callback(*args, **kwargs) | |
+ if result: | |
+ self.call_at(result, callback, *args, **kwargs) | |
def __sorted_apps(self): | |
return sorted(self.apps, key=lambda a: a.priority()) | |
@@ -306,10 +350,3 @@ class Router (component.Receiver): | |
self.debug("SENT message '%s' to %s via %s" % (message.text,\ | |
message.connection.identity, message.connection.backend.slug)) | |
return True | |
- | |
- def get_backend (self, name): | |
- backends = [b for b in self.backends if b.name == name] | |
- if backends: | |
- return backends[0] | |
- else: | |
- self.error("Unknown backend requested: %s", name) | |
diff --git a/lib/rapidsms/tests/scripted.py b/lib/rapidsms/tests/scripted.py | |
index 6b58fd2..5f41622 100644 | |
--- a/lib/rapidsms/tests/scripted.py | |
+++ b/lib/rapidsms/tests/scripted.py | |
@@ -4,7 +4,10 @@ from harness import MockRouter, EchoApp | |
from rapidsms.backends.backend import Backend | |
from rapidsms.message import Message | |
import unittest, re | |
-from django.test import TestCase | |
+try: | |
+ from django.test import TestCase | |
+except: | |
+ from unittest import TestCase | |
from datetime import datetime | |
class MetaTestScript (type): | |
diff --git a/lib/rapidsms/tests/test_router.py b/lib/rapidsms/tests/test_router.py | |
index 074af1d..1ac6e00 100755 | |
--- a/lib/rapidsms/tests/test_router.py | |
+++ b/lib/rapidsms/tests/test_router.py | |
@@ -1,8 +1,10 @@ | |
#!/usr/bin/env python | |
# vim: ai ts=4 sts=4 et sw=4 | |
-import unittest, threading | |
+import unittest, threading, time, datetime | |
from rapidsms.router import Router | |
+from rapidsms.connection import Connection | |
+from rapidsms.message import Message | |
from rapidsms.backends.backend import Backend | |
from rapidsms.tests.harness import MockApp, MockLogger | |
@@ -24,7 +26,7 @@ class TestRouter(unittest.TestCase): | |
component = r.build_component("rapidsms.tests.%s.MockApp", | |
{"type":"harness", "title":"test app"}) | |
self.assertEquals(type(component), MockApp, "component has right type") | |
- self.assertEquals(component.name, "test app", "component has right title") | |
+ self.assertEquals(component.title, "test app", "component has right title") | |
self.assertRaises(Exception, r.build_component, | |
("rapidsms.tests.%s.MockApp", | |
{"type":"harness", "title":"test app", "argh": "no config"}), | |
@@ -58,10 +60,57 @@ class TestRouter(unittest.TestCase): | |
pass | |
def test_start_and_stop (self): | |
- pass | |
+ r = Router() | |
+ r.logger = MockLogger() | |
+ threading.Thread(target=r.start).start() | |
+ self.assertTrue(r.running) | |
+ r.stop() | |
+ self.assertTrue(not r.running) | |
+ # not waiting for the router to shutdown causes exceptions | |
+ # on global destruction. (race condition) | |
+ time.sleep(1.0) | |
def test_run(self): | |
- pass | |
+ r = Router() | |
+ r.logger = MockLogger() | |
+ app = r.build_component("rapidsms.tests.%s.MockApp", | |
+ {"type":"harness", "title":"test app"}) | |
+ r.apps.append(app) | |
+ r.add_backend({"type":"backend", "title":"test_backend"}) | |
+ backend = r.get_backend("test-backend") # NOTE the dash; FIXME | |
+ msg = backend.message("test user", "test message") | |
+ r.send(msg) | |
+ r.run() | |
+ received = app.calls[-1][1] | |
+ self.assertEquals(msg, received, "message is identical") | |
+ self.assertEquals(msg.connection, received.connection, "same connection") | |
+ self.assertEquals(msg.text, received.text, "same text") | |
+ | |
+ def test_call_at (self): | |
+ def callback(stash, arg1, **argv): | |
+ stash["arg1"]=arg1 | |
+ if "try_again" in argv and "try_again" not in stash: | |
+ stash["try_again"] = False | |
+ return 1.0 | |
+ else: | |
+ stash.update(argv) | |
+ r = Router() | |
+ r.logger = MockLogger() | |
+ stash = {} | |
+ r.call_at(0.5, callback, stash, 1, arg2="a") | |
+ r.call_at(datetime.datetime.now() + datetime.timedelta(seconds=0.5), callback, stash, 1, arg3="b") | |
+ r.call_at(datetime.timedelta(seconds=1.0), callback, stash, 1, try_again=True) | |
+ r.call_at(3, callback, stash, 2) | |
+ threading.Thread(target=r.start).start() | |
+ time.sleep(1.0) | |
+ self.assertEquals(stash["arg1"], 1, "*args ok") | |
+ self.assertEquals(stash["arg2"], "a", "**kargs ok") | |
+ self.assertEquals(stash["arg3"], "b", "datetime works") | |
+ self.assertEquals(stash["try_again"], False, "timedelta works") | |
+ time.sleep(3.0) | |
+ self.assertEquals(stash["try_again"], True, "repeated callback") | |
+ self.assertEquals(stash["arg1"], 2, "int works") | |
+ r.stop() | |
def test_incoming(self): | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment