Skip to content

Instantly share code, notes, and snippets.

@ahopkins
Last active November 15, 2021 12:03
Show Gist options
  • Save ahopkins/52bcd7d15de1e0356ee22f82b6cbf9c8 to your computer and use it in GitHub Desktop.
Save ahopkins/52bcd7d15de1e0356ee22f82b6cbf9c8 to your computer and use it in GitHub Desktop.
Sanic Websocket Feed

Sanic Websockets Feeds v1

This is an outdated version

See latest

from .objects import Feed
# from sanic_jwt.decorators import protected
feeds = {}
def get_feed(feed_name, app):
if feed_name in feeds:
return feeds.get(feed_name)
else:
feed = Feed(app=app, feed_name=feed_name)
feeds[feed_name] = feed
return feed
def startup(app):
@app.websocket('/feed/<feed_name>')
# @protected()
async def feed(request, ws, feed_name):
# TODO:
# - In order to protect the view to authenticated users:
# - V1
# - Upon subscription, store the client with a token
# - Send the token to the browser through the client
# - Client receives the token and submits AJAX call with token
# - Endpoint receives the token and using existing authentication to update client as authenticated
# - V2
# - Before opening WebSocket, send HTTP request
# - Generate ticket with the access credentials
# - Return to be stored in browser
# - Open WebSocket, send the ticket
# - Attach the client connection to the ticket and access credentials
channel_name = feed_name.split('__')[-1]
print(f'The feed_name: {feed_name}')
print(f'The channel_name: {channel_name}')
print(ws.__class__.__name__)
print(hash(ws))
print(dir(request))
print(request.token)
print(request.headers)
feed = get_feed(channel_name, request.app)
await feed.run(ws)
import json
import asyncio
from websockets.exceptions import ConnectionClosed
class Feed(object):
def __init__(self, app, feed_name):
self.name = feed_name
self.app = app
self.clients = set()
def __len__(self):
return len(self.clients)
async def run(self, client):
await self._subscribe(client)
tasks = self.get_tasks(client)
await asyncio.wait(tasks)
def get_tasks(self, client):
consumer_task = asyncio.ensure_future(self._consumer_handler(client))
producer_task = asyncio.ensure_future(self._producer_handler())
return [consumer_task, producer_task]
async def persist(self, payload):
print('Message to persist: ', payload)
output = json.dumps(payload)
await self.app.redis.publish(self.name, output)
async def _consumer_handler(self, client):
print('')
print('consumer_handler')
while True:
try:
message = await client.recv()
print('')
print('message arrived', message)
await self._ingest(message)
# await self.app.redis.publish(self.name, json.dumps(message.get('data')))
except ConnectionClosed:
print('closing connection')
self._leave(client)
break
async def _producer_handler(self):
print('')
print('producer_handler')
while True:
message = await self.app.pubsub.get_message()
if message:
await self._broadcast(message)
await asyncio.sleep(1)
async def _ingest(self, message):
print('ingesting', message)
message = json.loads(message)
if 'action' in message and hasattr(self, message.get('action')):
action = getattr(self, message.get('action'))
await action(payload=message.get('payload', None))
else:
print(f'Could not locate the action:')
print(message)
async def _broadcast(self, message):
print('broadcasting to {} clients:'.format(len(self.clients)), message)
data = self._publishable(message.get('data', None))
for client in self.clients:
try:
await client.send(data)
except ConnectionClosed:
print('closing connection')
self._leave(client)
def _leave(self, client):
try:
self.clients.remove(client)
except ValueError:
pass
async def _subscribe(self, client):
await self.app.pubsub.subscribe(self.name)
self.clients.add(client)
def _publishable(self, raw):
if raw is None:
return ''
if isinstance(raw, str):
return raw
else:
return json.dumps(raw)
@greentornado
Copy link

greentornado commented Apr 25, 2019

Thanks for this. Are you still using this gist ^^. Do you use aioredis to build the app.pubsub ?

@ahopkins
Copy link
Author

Yes. I have something that is based off this with a few more features built in. I use both aredis and aioredis. I prefer the latter for handling pubsub, but the former for its api in general Redis usage.

@ahopkins
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment