Skip to content

Instantly share code, notes, and snippets.

@jkp
Created November 22, 2009 13:54
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save jkp/240571 to your computer and use it in GitHub Desktop.
Save jkp/240571 to your computer and use it in GitHub Desktop.
Experimental code to add async functionality to WSGI applications running under Tornado
#!/usr/bin/env python
#
# Jamie Kirkpatrick, November 2009. <jkp@kirkconsulting.co.uk>
# Released under the BSD license.
#
"""
Experimental code to add asyncronous functionality to WSGI applications
running under the Tornado webserver. Uses greenlet to spin micro-threads
which can be suspended and resumed within a single thread as required.
"""
from greenlet import greenlet
from tornado.wsgi import WSGIContainer
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from threading import local
thread_data = local()
thread_data.tasks = set()
class AsyncMixin(object):
"""
Mixin for WSGI applications that wish to implement async functionality.
Subclasses can pass the a bound version of the resume function to some
long-running task and have it invoke the callback with it's results.
When the callback is invoked the suspended request will resume.
"""
def async_wait(self, environ):
"""
Suspends the request until the resume function is called.
"""
thread_data.tasks.remove(environ["async.task"])
return environ["async.suspend"]()
def resume(self, environ, *args, **kwargs):
"""
Resumes a previously suspended request.
"""
thread_data.tasks.add(environ["async.task"])
environ["async.resume"](*args, **kwargs)
class AsyncWSGIContainer(WSGIContainer):
"""
WSGI container subclass that adds async funciontality to requests.
"""
def _environ(self, request):
environ = super(AsyncWSGIContainer, self)._environ(request)
environ["async.task"] = request.task
environ["async.suspend"] = request.suspend
environ["async.resume"] = request.resume
return environ
def __call__(self, request):
task = greenlet(super(AsyncWSGIContainer, self).__call__)
thread_data.tasks.add(task)
request.task = task
request.suspend = task.parent.switch
request.resume = task.switch
task.switch(request)
while len(thread_data.tasks):
for task in list(thread_data.tasks):
if task.dead:
thread_data.tasks.remove(task)
task.switch()
class UpdatesMixin(AsyncMixin):
"""
An example mixin that shows the ability to push async
updates down a long-running request. Based on the example code for
the cappuccino-tornado integration
See: http://github.com/eliasklughammer/Cappuccino-X-Tornado.
"""
waiters = []
def wait_for_updates(self, environ):
self.waiters.append(lambda *a, **k: self.resume(environ, *a, **k))
return self.async_wait(environ)
@classmethod
def new_updates(cls, updates):
for callback in cls.waiters:
callback(updates)
cls.waiters = []
class ExampleApplication(UpdatesMixin):
"""
An example application that either waits asynchronously for updates or
posts them to listeners.
"""
def __call__(self, environ, start_response):
if environ["PATH_INFO"] == "/async":
updates = self.wait_for_updates(environ)
start_response("200", [("Content-type", "text/plain")])
return "Updates: %s" % updates
else:
start_response("200", [("Content-type", "text/plain")])
self.new_updates("Update arrived")
return "Pushed updates"
if __name__ == "__main__":
app = ExampleApplication()
container = AsyncWSGIContainer(app)
http_server = HTTPServer(container)
http_server.listen(8888)
IOLoop.instance().start()
@dlo
Copy link

dlo commented Jun 14, 2010

Awesome!

@jkp
Copy link
Author

jkp commented Jun 15, 2010

Glad to see it was useful for someone! I completely forgot I did this... :)

@hamannp
Copy link

hamannp commented Apr 5, 2011

Did you ever get it working with a real app? I want to use repoze.what with Tornado.

@jkp
Copy link
Author

jkp commented Apr 5, 2011

I used it once or twice yeah....didn't give it a major hammering but it seemed to do the trick :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment