-
-
Save ecarreras/83e846d0c5ee6f229d571bafd1a714c8 to your computer and use it in GitHub Desktop.
SSE examples
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| from __future__ import absolute_import, unicode_literals | |
| from tools import config | |
| class SSE: | |
| @staticmethod | |
| def send(data, type=None, id=None, retry=None, channel='sse'): | |
| from flask import Flask, current_app | |
| from flask.ctx import AppContext | |
| from flask_sse import sse | |
| if not current_app: | |
| app = Flask(__name__) | |
| app.config['REDIS_URL'] = config['redis_url'] | |
| with AppContext(app): | |
| return SSE.send(data, type, id, retry, channel) | |
| if '*' in channel: | |
| for pat_channel in sse.redis.pubsub_channels(channel): | |
| SSE.send(data, type, id, retry, pat_channel) | |
| return sse.publish(data, type, id, retry, channel) | |
| class Notify(object): | |
| channel = "private.user.notifications" | |
| @classmethod | |
| def all(cls, title, message, message_type="info", buttons=None, duration=None, on_commit=True, message_id=None): | |
| if buttons is None: | |
| buttons = [] | |
| data = { | |
| "title": title, | |
| "message": message, | |
| "type": message_type, | |
| "buttons": buttons, | |
| "duration": duration, | |
| "message_id": message_id, | |
| } | |
| channel = "{}.*".format(cls.channel) | |
| return Notify.notify(data=data, channel=channel, on_commit=on_commit) | |
| @classmethod | |
| def user(cls, user_id, title, message, message_type="info", buttons=None, duration=None, on_commit=True, message_id=None): | |
| if buttons is None: | |
| buttons = [] | |
| channel = "{}.*.{}".format(cls.channel, user_id) | |
| data = { | |
| "title": title, | |
| "message": message, | |
| "type": message_type, | |
| "buttons": buttons, | |
| "duration": duration, | |
| "message_id": message_id, | |
| } | |
| return Notify.notify(data=data, channel=channel, on_commit=on_commit) | |
| @classmethod | |
| def session(cls, title, message, message_type="info", buttons=None, duration=None, on_commit=True, message_id=None): | |
| if buttons is None: | |
| buttons = [] | |
| channel = "{}.{}.{}".format(cls.channel, session_id, current_session.uid) | |
| data = { | |
| "title": title, | |
| "message": message, | |
| "type": message_type, | |
| "buttons": buttons, | |
| "duration": duration, | |
| "message_id": message_id, | |
| } | |
| return Notify.notify(data=data, channel=channel, on_commit=on_commit) | |
| @staticmethod | |
| def notify(data, channel, on_commit=True): | |
| if on_commit and current_cursor and not current_cursor._cnx.readonly: | |
| current_cursor.on_commit( | |
| SSE.send, data=data, channel=channel, type='message' | |
| ) | |
| else: | |
| SSE.send(data=data, channel=channel, type='message') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment