Skip to content

Instantly share code, notes, and snippets.

@mivade
Last active July 5, 2019 05:48
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mivade/0ac9e8ec58acd2366133 to your computer and use it in GitHub Desktop.
Save mivade/0ac9e8ec58acd2366133 to your computer and use it in GitHub Desktop.
Using Tornado websockets for the publish/subscribe pattern
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Tornado Websocket Pubsub</title>
<meta name="description" content="Tornado Websocket Pubsub">
<meta name="author" content="Michael V. DePalatis">
<!--[if lt IE 9]>
<script src="http://html5shiv.googlecode.com/svn/trunk/html5.js"></script>
<![endif]-->
<script src="pubsub.js"></script>
</head>
<body>
<div id="placeholder"></div>
</body>
</html>
var socket,
url = window.location;
function connect() {
socket = new WebSocket('ws://' + url.host + '/socket');
socket.onmessage = function (msg) {
var data = JSON.parse(msg.data),
html = data.value.toString() + ' ';
document.querySelector('#placeholder').innerHTML += html;
console.log(data.value);
};
}
window.onload = function () {
connect();
};
window.onbeforeunload = function () {
socket.close();
};
"""A demonstration of using websockets for pubsub-like functionality in
Tornado.
"""
import random
from tornado import gen
from tornado.options import options, define
from tornado.ioloop import IOLoop
from tornado.web import Application, RequestHandler, StaticFileHandler
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from tornado.queues import Queue
class Publisher(object):
"""Handles new data to be passed on to subscribers."""
def __init__(self):
self.messages = Queue()
self.subscribers = set()
def register(self, subscriber):
"""Register a new subscriber."""
self.subscribers.add(subscriber)
def deregister(self, subscriber):
"""Stop publishing to a subscriber."""
self.subscribers.remove(subscriber)
@gen.coroutine
def submit(self, message):
"""Submit a new message to publish to subscribers."""
yield self.messages.put(message)
@gen.coroutine
def publish(self):
while True:
message = yield self.messages.get()
if len(self.subscribers) > 0:
print("Pushing message {} to {} subscribers...".format(
message, len(self.subscribers)))
yield [subscriber.submit(message) for subscriber in self.subscribers]
class MainHandler(RequestHandler):
"""Renders the main template for displaying messages to subscribers."""
def get(self):
self.render('index.html')
class Subscription(WebSocketHandler):
"""Websocket for subscribers."""
def initialize(self, publisher):
self.publisher = publisher
self.messages = Queue()
self.finished = False
def open(self):
print("New subscriber.")
self.publisher.register(self)
self.run()
def on_close(self):
self._close()
def _close(self):
print("Subscriber left.")
self.publisher.deregister(self)
self.finished = True
@gen.coroutine
def submit(self, message):
yield self.messages.put(message)
@gen.coroutine
def run(self):
while not self.finished:
message = yield self.messages.get()
print("New message: " + str(message))
self.send(message)
def send(self, message):
try:
self.write_message(dict(value=message))
except WebSocketClosedError:
self._close()
@gen.coroutine
def generate_data(publisher):
while True:
data = random.randint(0, 9)
yield publisher.submit(data)
yield gen.sleep(random.randint(0, 2))
@gen.coroutine
def main():
define('port', default=8080)
options.parse_command_line()
publisher = Publisher()
app = Application(
[
('/((\w*).js)', StaticFileHandler, dict(path='.')),
('/', MainHandler),
('/socket', Subscription, dict(publisher=publisher))
]
)
app.listen(options.port)
yield [publisher.publish(), generate_data(publisher)]
if __name__ == "__main__":
IOLoop.instance().run_sync(main)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment