Skip to content

Instantly share code, notes, and snippets.

@schuyler
Created June 29, 2009 09:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save schuyler/137537 to your computer and use it in GitHub Desktop.
Save schuyler/137537 to your computer and use it in GitHub Desktop.
scheduler patch for RapidSMS
diff --git a/lib/rapidsms/router.py b/lib/rapidsms/router.py
index 0785499..ddecd0a 100644
--- a/lib/rapidsms/router.py
+++ b/lib/rapidsms/router.py
@@ -1,7 +1,7 @@
#!/usr/bin/env python
# vim: ai ts=4 sts=4 et sw=4
-import time, datetime, os
+import time, datetime, os, heapq
import threading
import traceback
@@ -16,6 +16,7 @@ class Router (component.Receiver):
component.Receiver.__init__(self)
self.backends = []
self.apps = []
+ self.events = []
self.running = False
self.logger = None
@@ -179,6 +180,39 @@ class Router (component.Receiver):
except Exception:
self.log_last_exception("The %s backend failed to stop" % backend.slug)
+ """
+ The call_at() method schedules a callback with the router. It takes as its
+ first argument one of several possible objects:
+
+ * if an int or float, the callback is called that many
+ seconds later.
+ * if a datetime.datetime object, the callback is called at
+ the time specified.
+ * if a datetime.timedelta object, the callback is called
+ at the timedelta from now.
+
+ call_at() takes as its second argument the function to be called
+ at the scheduled time. All other positional and keyword arguments
+ are passed directly to the callback when it is called.
+
+ If the callback returns a true value that is one of the time objects
+ understood by call_at(), the callback will be called again at the
+ specified time with the same arguments.
+ """
+ def call_at (self, when, callback, *args, **kwargs):
+ if isinstance(when, datetime.timedelta):
+ when = datetime.datetime.now() + when
+ if isinstance(when, datetime.datetime):
+ when = time.mktime(when.timetuple())
+ elif isinstance(when, (int, float)):
+ when = time.time() + when
+ else:
+ self.debug("Call to %s wasn't scheduled with a suitable time: %s",
+ callback.func_name, when)
+ return
+ self.debug("Scheduling call to %s at %s",
+ callback.func_name, datetime.datetime.fromtimestamp(when).ctime())
+ heapq.heappush(self.events, (when, callback, args, kwargs))
def start (self):
self.running = True
@@ -224,6 +258,7 @@ class Router (component.Receiver):
# wait until we're asked to stop
while self.running:
try:
+ self.call_scheduled()
self.run()
except KeyboardInterrupt:
break
@@ -240,6 +275,15 @@ class Router (component.Receiver):
msg = self.next_message(timeout=1.0)
if msg is not None:
self.incoming(msg)
+
+ def call_scheduled(self):
+ while self.events and self.events[0][0] < time.time():
+ when, callback, args, kwargs = heapq.heappop(self.events)
+ self.info("Calling %s(%s, %s)",
+ callback.func_name, args, kwargs)
+ result = callback(*args, **kwargs)
+ if result:
+ self.call_at(result, callback, *args, **kwargs)
def __sorted_apps(self):
return sorted(self.apps, key=lambda a: a.priority())
@@ -306,10 +350,3 @@ class Router (component.Receiver):
self.debug("SENT message '%s' to %s via %s" % (message.text,\
message.connection.identity, message.connection.backend.slug))
return True
-
- def get_backend (self, name):
- backends = [b for b in self.backends if b.name == name]
- if backends:
- return backends[0]
- else:
- self.error("Unknown backend requested: %s", name)
diff --git a/lib/rapidsms/tests/scripted.py b/lib/rapidsms/tests/scripted.py
index 6b58fd2..5f41622 100644
--- a/lib/rapidsms/tests/scripted.py
+++ b/lib/rapidsms/tests/scripted.py
@@ -4,7 +4,10 @@ from harness import MockRouter, EchoApp
from rapidsms.backends.backend import Backend
from rapidsms.message import Message
import unittest, re
-from django.test import TestCase
+try:
+ from django.test import TestCase
+except:
+ from unittest import TestCase
from datetime import datetime
class MetaTestScript (type):
diff --git a/lib/rapidsms/tests/test_router.py b/lib/rapidsms/tests/test_router.py
index 074af1d..1ac6e00 100755
--- a/lib/rapidsms/tests/test_router.py
+++ b/lib/rapidsms/tests/test_router.py
@@ -1,8 +1,10 @@
#!/usr/bin/env python
# vim: ai ts=4 sts=4 et sw=4
-import unittest, threading
+import unittest, threading, time, datetime
from rapidsms.router import Router
+from rapidsms.connection import Connection
+from rapidsms.message import Message
from rapidsms.backends.backend import Backend
from rapidsms.tests.harness import MockApp, MockLogger
@@ -24,7 +26,7 @@ class TestRouter(unittest.TestCase):
component = r.build_component("rapidsms.tests.%s.MockApp",
{"type":"harness", "title":"test app"})
self.assertEquals(type(component), MockApp, "component has right type")
- self.assertEquals(component.name, "test app", "component has right title")
+ self.assertEquals(component.title, "test app", "component has right title")
self.assertRaises(Exception, r.build_component,
("rapidsms.tests.%s.MockApp",
{"type":"harness", "title":"test app", "argh": "no config"}),
@@ -58,10 +60,57 @@ class TestRouter(unittest.TestCase):
pass
def test_start_and_stop (self):
- pass
+ r = Router()
+ r.logger = MockLogger()
+ threading.Thread(target=r.start).start()
+ self.assertTrue(r.running)
+ r.stop()
+ self.assertTrue(not r.running)
+ # not waiting for the router to shutdown causes exceptions
+ # on global destruction. (race condition)
+ time.sleep(1.0)
def test_run(self):
- pass
+ r = Router()
+ r.logger = MockLogger()
+ app = r.build_component("rapidsms.tests.%s.MockApp",
+ {"type":"harness", "title":"test app"})
+ r.apps.append(app)
+ r.add_backend({"type":"backend", "title":"test_backend"})
+ backend = r.get_backend("test-backend") # NOTE the dash; FIXME
+ msg = backend.message("test user", "test message")
+ r.send(msg)
+ r.run()
+ received = app.calls[-1][1]
+ self.assertEquals(msg, received, "message is identical")
+ self.assertEquals(msg.connection, received.connection, "same connection")
+ self.assertEquals(msg.text, received.text, "same text")
+
+ def test_call_at (self):
+ def callback(stash, arg1, **argv):
+ stash["arg1"]=arg1
+ if "try_again" in argv and "try_again" not in stash:
+ stash["try_again"] = False
+ return 1.0
+ else:
+ stash.update(argv)
+ r = Router()
+ r.logger = MockLogger()
+ stash = {}
+ r.call_at(0.5, callback, stash, 1, arg2="a")
+ r.call_at(datetime.datetime.now() + datetime.timedelta(seconds=0.5), callback, stash, 1, arg3="b")
+ r.call_at(datetime.timedelta(seconds=1.0), callback, stash, 1, try_again=True)
+ r.call_at(3, callback, stash, 2)
+ threading.Thread(target=r.start).start()
+ time.sleep(1.0)
+ self.assertEquals(stash["arg1"], 1, "*args ok")
+ self.assertEquals(stash["arg2"], "a", "**kargs ok")
+ self.assertEquals(stash["arg3"], "b", "datetime works")
+ self.assertEquals(stash["try_again"], False, "timedelta works")
+ time.sleep(3.0)
+ self.assertEquals(stash["try_again"], True, "repeated callback")
+ self.assertEquals(stash["arg1"], 2, "int works")
+ r.stop()
def test_incoming(self):
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment