Skip to content

Instantly share code, notes, and snippets.

@AstraLuma
Last active November 5, 2023 11:33
Show Gist options
  • Save AstraLuma/bd340e1ba20613ce5c1f1b5ff552ebfb to your computer and use it in GitHub Desktop.
Save AstraLuma/bd340e1ba20613ce5c1f1b5ff552ebfb to your computer and use it in GitHub Desktop.
Python Server-Sent Events/EventSource Implementations
# For Django Channels
import json
from channels.exceptions import StopConsumer
from channels.generic.http import AsyncHttpConsumer
from django.utils.http import parse_header_parameters
from django.utils.http import parse_header_parameters
def get_accepts(scope: dict):
for header, value in scope['headers']:
if header.lower() == b'accept':
yield from parse_accept(value.decode('utf-8'))
def parse_accept(txt: str):
for bit in txt.split(','):
yield parse_header_parameters(bit.strip())
class EventsRouter:
"""
ASGI middleware that seperates EventSource requests from normal ones.
Namely, if the request accepts text/event-stream, SSE is used.
"""
def __init__(self, events, web):
self.events_handler = events
self.web_handler = web
def __call__(self, scope, receive, send):
types = [t.lower() for t, _ in get_accepts(scope)]
if 'text/event-stream' in types and scope['method'] == 'GET':
return self.events_handler(scope, receive, send)
else:
return self.web_handler(scope, receive, send)
class AsyncSSEConsumer(AsyncHttpConsumer):
# This mostly overrides AsyncHttpConsumer but it provides some useful utilities
@property
def last_event_id(self):
"""
The value of the Last-Event-ID header, or None if not given.
Raises an error if the headers haven't been received yet.
"""
return self.scope['headers'].get('Last-Event-ID', None)
# User API
async def send_headers(self, *, status=200, headers=None):
if status == 200:
if headers is None:
headers = [
(b'Content-Type', b'text/event-stream'),
(b'Cache-Control', b'no-cache'),
]
elif isinstance(headers, dict):
headers[b'Content-Type'] = b'text/event-stream'
headers[b'Cache-Control'] = b'no-cache'
else:
headers += [
(b'Content-Type', b'text/event-stream'),
(b'Cache-Control', b'no-cache'),
]
return await super().send_headers(status=status, headers=headers)
async def accept(self):
"""
Accept the SSE connection.
Sends a 200 Ok with the appropriate Content-Type and such.
"""
await self.send_headers(status=200)
# Force sending headers immediately
await self.send_body(b"", more_body=True)
async def reject(self, *, status, headers=None, body=None):
"""
Reject the SSE, sending an error and terminating the connection.
"""
await self.send_headers(status=status, headers=headers)
await self.send_body(body, more_body=False)
await self.disconnect()
raise StopConsumer()
async def send_event(self, *, data, **fields):
"""
Sends an event to the client, with the fields as keyword arguments.
The data field is required.
Other fields specified in HTML5 (section 9.2):
* event: the event type
* id: the event ID (used to when reconnecting)
* retry: time to wait before reconnecting, in seconds
"""
fields['data'] = data
await self.send_body(
b"\n".join(
f"{name}: {line}".encode('utf-8')
for name, value in fields.items()
for line in str(value).replace('\r\n', '\n').replace('\r', '\n').split('\n')
) + b"\n\n",
more_body=True,
)
async def send_event_json(self, *, data, **fields):
await self.send_event(data=json.dumps(data), **fields)
async def terminate(self):
"""
Kill the connection from the server side.
"""
await self.send_body(b"", more_body=False)
await self.disconnect()
raise StopConsumer()
# Overridables
async def connect(self):
"""
Called when the SSE is opened
"""
# Default: Just accept the connection
await self.accept()
async def disconnect(self):
"""
Overrideable place to run disconnect handling. Do not send anything
from here.
"""
pass
# Partial implementation of an SSE client on requests
# This needs work to be more generic
import json
import requests
import socket
import time
import urllib3
class HTTPAdapterWithKeepalive(requests.adapters.HTTPAdapter):
# Keepalive parameters
interval_sec = 30
idle_sec = interval_sec
max_fails = 5
def init_poolmanager(self, *args, **kwargs):
sockopts = urllib3.connection.HTTPConnection.default_socket_options + []
if hasattr(socket, 'SO_KEEPALIVE'):
# Not Windows
sockopts += [
(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1),
]
if hasattr(socket, 'TCP_KEEPALIVE'):
# Mac
sockopts += [
(socket.IPPROTO_TCP, socket.TCP_KEEPALIVE, self.interval_sec)
]
if hasattr(socket, 'TCP_KEEPIDLE'):
# Linux
sockopts += [
(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, self.idle_sec)
]
if hasattr(socket, 'TCP_KEEPINTVL'):
# Linux
sockopts += [
(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, self.interval_sec)
]
if hasattr(socket, 'TCP_KEEPCNT'):
# Linux
sockopts += [
(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, self.max_fails)
]
# Windows:
# sock.ioctl(socket.SIO_KEEPALIVE_VALS, (<1 to turn on>, <idle time in ms>, <interval in ms>))
# https://msdn.microsoft.com/en-us/library/dd877220%28v=vs.85%29.aspx
super().init_poolmanager(*args, socket_options=sockopts, **kwargs)
def get_session():
adapter = HTTPAdapterWithKeepalive()
s = requests.session()
s.mount("http://", adapter)
s.mount("https://", adapter)
return s
def stream_raw_sse(mkrequest, *pargs, _last_event_id=None, headers=None, **kwargs):
"""
Streams Server-Sent Events, each event produced as a sequence of
(field, value) pairs.
Does not handle reconnection, etc.
"""
if headers is None:
headers = {}
headers['Accept'] = 'text/event-stream'
headers['Cache-Control'] = 'no-cache'
# Per https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model
if _last_event_id is not None:
headers['Last-Event-ID'] = _last_event_id
with mkrequest(*pargs, headers=headers, stream=True, **kwargs) as resp:
fields = []
for line in resp.iter_lines(decode_unicode=True):
# https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
if not line:
yield fields
fields = []
elif line.startswith(':'):
pass
elif ':' in line:
field, value = line.split(':', 1)
if value.startswith(' '):
value = value[1:]
fields += [(field, value)]
else: # Non-blank, without a colon
fields += [(line, '')]
def stream_sse(mkrequest, *pargs, **kwargs):
"""
Streams server-sent events, producing a dictionary of the fields.
Handles reconnecting, Last-Event-ID, and retry waits.
Deviates by spec by passing through unknown fields instead of ignoring them.
If an unknown field is given more than once, the last given wins (like
event and id).
"""
retry = 0
last_id = None
while True:
try:
for rawmsg in stream_raw_sse(mkrequest, *pargs, _last_event_id=last_id, **kwargs):
msg = {'event': 'message', 'data': ''}
# https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
for k, v in rawmsg:
if k == 'retry':
try:
retry = int(v)
except ValueError:
pass
elif k == 'data':
if msg['data']:
msg['data'] += '\n' + v
else:
msg['data'] = v
else:
if k == 'id':
last_id = v
# Spec says we should ignore unknown fields. We're passing them on.
msg[k] = v
if not msg['data']:
pass
yield msg
else:
raise StopIteration # Really just exists to get caught in the next line
except (StopIteration, requests.RequestException, EOFError):
# End of stream, try to reconnect
# NOTE: GeneratorExit is thrown if the consumer kills us (or we get GC'd)
# TODO: Log something?
# Wait, fall through, and start at the top
time.sleep(retry / 1000)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment