Created
May 25, 2012 13:01
-
-
Save guyzmo/2788003 to your computer and use it in GitHub Desktop.
A Tornado-based library that enables Event Source support !
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import argparse | |
import logging | |
log = logging.getLogger('eventsource.client') | |
from tornado.ioloop import IOLoop | |
from tornado.httpclient import AsyncHTTPClient, HTTPRequest | |
class Event(object): | |
def __init__(self): | |
self.name = None | |
self.data = None | |
self.id = None | |
def __repr__(self): | |
return "Event<%s,%s,%s>" % (str(self.id), str(self.name), str(self.data.replace('\n','\\n'))) | |
class EventSourceClient(object): | |
def __init__(self,url,action,target): | |
self._url = "http://%s/%s/%s" % (url,action,target) | |
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient") | |
self.http_client = AsyncHTTPClient() | |
self._event_buffer = [] | |
def poll(self): | |
self.http_client.fetch(HTTPRequest(url=self._url, | |
method='GET', | |
headers={"content-type":"text/event-stream"}, | |
request_timeout=0, | |
streaming_callback=self.handle_stream), self.handle_request) | |
IOLoop.instance().start() | |
def handle_stream(self,message): | |
event = Event() | |
for line in message.strip('\r\n').split('\r\n'): | |
(field, value) = line.split(":",1) | |
if field == 'event': | |
event.name = value | |
elif field == 'data': | |
value = value.lstrip() | |
if event.data is None: | |
event.data = value | |
else: | |
event.data = "%s\n%s" % (event.data, value) | |
elif field == 'id': | |
event.id = value | |
elif field == '': | |
log.info( "received comment: %s" % (value,) ) | |
else: | |
raise Exception("Unknown field !") | |
self._event_buffer.append(event) | |
log.debug( "received event: %s" % (event,) ) | |
def handle_request(self,response): | |
if response.error: | |
print "Error:", response.error | |
else: | |
print response.body | |
IOLoop.instance().stop() | |
def start(): | |
parser = argparse.ArgumentParser(prog=sys.argv[0], | |
description="Event Source Client") | |
parser.add_argument("-H", | |
"--host", | |
dest="host", | |
default='127.0.0.1', | |
help='Host to connect to.') | |
# PORT ARGUMENT | |
parser.add_argument("-P", | |
"--port", | |
dest="port", | |
default='8888', | |
help='Port to be used connection.') | |
parser.add_argument("-d", | |
"--debug", | |
dest="debug", | |
action="store_true", | |
help='enables debug output') | |
parser.add_argument("-t", | |
"--token", | |
dest="token", | |
required=True, | |
help='Token to be used for connection') | |
args = parser.parse_args(sys.argv[1:]) | |
if args.debug: | |
logging.basicConfig(level=logging.DEBUG) | |
else: | |
logging.basicConfig(level=logging.INFO) | |
EventSourceClient("%s:%s" % (args.host, args.port),"poll",args.token).poll() | |
if __name__ == "__main__": | |
start() | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# resources: | |
# * http://stackoverflow.com/questions/10665569/websocket-event-source-implementation-to-expose-a-two-way-rpc-to-a-python-dj | |
# * http://stackoverflow.com/questions/8812715/using-a-simple-python-generator-as-a-co-routine-in-a-tornado-async-handler | |
# * http://dev.w3.org/html5/eventsource/#event-stream-interpretation | |
# * still to be implemented: | |
# * id and retry fields support | |
# * comment support. | |
import json | |
import httplib | |
import tornado.web | |
import tornado.ioloop | |
import logging as log | |
class Event(): | |
""" | |
Class that defines an event, its behaviour and the matching actions | |
LISTEN is the GET event that will open an event source communication | |
FINISH is the POST event that will end an event source communication started by LISTEN | |
ACTIONS contains the list of acceptable POST targets. | |
target is the token that matches an event source channel | |
action contains the name of the action (which shall be in ACTIONS) | |
value contains a list of every lines of the value to be parsed | |
""" | |
content_type = "text/plain" | |
LISTEN = "poll" | |
FINISH = "close" | |
ACTIONS=[FINISH] | |
"""Property to split multiline values""" | |
@classmethod | |
def get_value(self): | |
pass | |
@classmethod | |
def set_value(self, v): | |
pass | |
value = property(get_value,set_value) | |
def __init__(self, target, action, value=None): | |
""" | |
Creates a new Event object with | |
@param target a string matching an open channel | |
@param action a string matching an action in the ACTIONS list | |
@param value a value to be embedded | |
""" | |
self.target = target | |
self.action = action | |
self.set_value(value) | |
## | |
class StringEvent(Event): | |
ACTIONS=["ping",Event.FINISH] | |
"""Property to enable multiline output of the value""" | |
def get_value(self): | |
return [line for line in self._value.split('\n')] | |
def set_value(self, v): | |
self._value = v | |
value = property(get_value,set_value) | |
class JSONEvent(Event): | |
content_type = "application/json" | |
LISTEN = "poll" | |
FINISH = "close" | |
ACTIONS=["ping",FINISH] | |
"""Property to enable JSON checking of the value""" | |
def get_value(self): | |
return [json.dumps(self._value)] | |
def set_value(self, v): | |
self._value = json.loads(v) | |
value = property(get_value,set_value) | |
## | |
class EventSourceHandler(tornado.web.RequestHandler): | |
_connected = {} | |
_events = {} | |
def initialize(self, event_class=StringEvent): | |
""" | |
Takes an Event based class to define the event's handling | |
""" | |
self._event_class = event_class | |
"""Tools""" | |
def push(self, event): | |
""" | |
For a given event, write event-source outputs on current handler | |
@param event Event based incoming event | |
""" | |
self.write("event: "+unicode(event.action)+"\r\n") | |
for line in event.value: | |
self.write("data: %s\r\n" % (unicode(line),)) | |
self.write("\r\n") | |
self.flush() | |
def buffer_event(self, target, action, value=None): | |
""" | |
creates and store an event for the target | |
@param target string identifying current target | |
@param action string matching one of Event.ACTIONS | |
@param value string containing a value | |
""" | |
self._events[target].append(self._event_class(target, action, value)) | |
def is_connected(self, target): | |
""" | |
@param target string identifying a given target | |
@return true if target is connected | |
""" | |
return target in self._connected.values() | |
def set_connected(self, target): | |
""" | |
registers target as being connected | |
@param target string identifying a given target | |
this method will add target to the connected list, | |
and create an empty event buffer | |
""" | |
log.debug("set_connected(%s)" % (target,)) | |
self._connected[self] = target | |
self._events[target] = [] | |
def set_disconnected(self): | |
""" | |
unregisters current handler as being connected | |
this method will remove target from the connected list, | |
and delete the event buffer | |
""" | |
try: | |
target = self._connected[self] | |
log.debug("set_disconnected(%s)" % (target,)) | |
del(self._events[target]) | |
del(self._connected[self]) | |
except Exception, err: | |
log.error("set_disconnected(%s,%s): %s", str(self), target, err) | |
def write_error(self, status_code, **kwargs): | |
""" | |
overloads the write_error() method of RequestHandler, to | |
support more explicit messages than only the ones from httplib. | |
""" | |
if self.settings.get("debug") and "exc_info" in kwargs: | |
# in debug mode, try to send a traceback | |
self.set_header('Content-Type', 'text/plain') | |
for line in traceback.format_exception(*kwargs["exc_info"]): | |
self.write(line) | |
self.finish() | |
else: | |
if 'mesg' in kwargs: | |
self.finish("<html><title>%(code)d: %(message)s</title>" | |
"<body>%(code)d: %(mesg)s</body></html>\n" % { | |
"code": status_code, | |
"message": httplib.responses[status_code], | |
"mesg": kwargs["mesg"], | |
}) | |
else: | |
self.finish("<html><title>%(code)d: %(message)s</title>" | |
"<body>%(code)d: %(message)s</body></html>\n" % { | |
"code": status_code, | |
"message": httplib.responses[status_code], | |
}) | |
"""Synchronous actions""" | |
def post(self,action,target): | |
""" | |
Triggers an event | |
@param action string defining the type of event | |
@param target string defining the target handler to send it to | |
this method will look for the request body to get post's data. | |
""" | |
log.debug("post(%s,%s)" % (target,action)) | |
self.set_header("Accept", self._event_class.content_type) | |
if target not in self._connected.values(): | |
self.send_error(404,mesg="Target is not connected") | |
elif action not in self._event_class.ACTIONS: | |
self.send_error(404,mesg="Unknown action requested") | |
else: | |
try: | |
self.buffer_event(target,action,self.request.body) | |
except ValueError, ve: | |
self.send_error(400,mesg="JSON data is not properly formatted: <br />%s" % (ve,)) | |
"""Asynchronous actions""" | |
def _event_generator(self,target): | |
""" | |
parses all events buffered for target and yield them | |
""" | |
for ev in self._events[target]: | |
self._events[target].remove(ev) | |
yield ev | |
def _event_loop(self): | |
""" | |
for target matching current handler, gets and forwards all events | |
until Event.FINISH is reached, and then closes the channel. | |
""" | |
if self.is_connected(self.target): | |
for event in self._event_generator(self.target): | |
if event.action == self._event_class.FINISH: | |
self.set_disconnected() | |
self.finish() | |
return | |
self.push(event) | |
tornado.ioloop.IOLoop.instance().add_callback(self._event_loop) | |
@tornado.web.asynchronous | |
def get(self,action,target): | |
""" | |
Opens a new event_source connection and wait for events to come | |
Returns error 423 if the target token already exists | |
Redirects to / if action is not matching Event.LISTEN. | |
""" | |
log.debug("get(%s,%s)" % (target, action)) | |
if action == self._event_class.LISTEN: | |
self.set_header("Content-Type", "text/event-stream") | |
self.target = target | |
if self.is_connected(target): | |
self.send_error(423,mesg="Target is already connected") | |
return | |
self.set_connected(target) | |
tornado.ioloop.IOLoop.instance().add_callback(self._event_loop) | |
else: | |
self.redirect("/",permanent=True) | |
def on_connection_close(self): | |
""" | |
overloads RequestHandler's on_connection_close to disconnect | |
currents handler on client's socket disconnection. | |
""" | |
log.debug("on_connection_close()") | |
self.set_disconnected() | |
### | |
application = tornado.web.Application([ | |
(r"/(.*)/(.*)", EventSourceHandler, dict(event_class=StringEvent)), | |
]) | |
if __name__ == "__main__": | |
log.basicConfig(level=log.DEBUG) | |
application.listen(8888) | |
tornado.ioloop.IOLoop.instance().start() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
fully working implementation on https://github.com/guyzmo/event-source-library