Skip to content

Instantly share code, notes, and snippets.

@jbr
Last active April 15, 2020 22:13
Show Gist options
  • Save jbr/5d9f9c1e628b97c9e20984ee3eb4fb4c to your computer and use it in GitHub Desktop.
Save jbr/5d9f9c1e628b97c9e20984ee3eb4fb4c to your computer and use it in GitHub Desktop.
hacked-up async_sse
if nobody is listening to an event stream, it never gets encoded,
since encoding happens on the consumption end of the channel
library users can use StreamExt to subset/filter/transform events
before they're encoded, since user-defined events are on the channel,
not already-encoded text
channel specifics are outside of the purview of this code. as long as
it implements a stream of Events, it can be encoded as an SSE
response. if there's some sort of fancy fanout logic that isn't
expressed with a BroadcastChannel and StreamExt, that'll still be
workable
each event is encoded as a single contiguous u8 slice, ensuring that lines can't be
accidentally interleaved (seems possible with current async_sse?)
---
Biggest disadvantage:
events are encoded once per listener, which is potentially expensive
with a lot of connections. ideally this would be cached somewhere, but
i'm not comfortable enough with rust data structures to know the right
way to do this. some sort of weak map where the value Drops when the
key drops?
// this file is entirely "library user code"
mod sse;
use async_std::{
stream::StreamExt,
sync::{Arc, Mutex},
};
use broadcaster::BroadcastChannel;
use sse::{Event, EventStream};
use tide::Response;
use MyEvent::*;
#[derive(Clone, Debug)]
enum MyEvent {
A,
B,
}
impl Event for MyEvent {
fn name(&self) -> &str {
match self {
B => "A",
A => "B",
}
}
fn data(&self) -> &[u8] {
match self {
A => "hey from an A message".as_bytes(),
B => "hello from a B message".as_bytes(),
}
}
fn id(&self) -> Option<&str> {
None
}
}
type Request = tide::Request<Arc<Mutex<BroadcastChannel<MyEvent>>>>;
async fn send_a(request: Request) -> Response {
let state = request.state();
state.lock().await.send(&A).await.unwrap();
Response::new(200)
}
async fn send_b(request: Request) -> Response {
let state = request.state();
state.lock().await.send(&B).await.unwrap();
Response::new(200)
}
async fn stream_all(request: Request) -> Response {
let state = request.state();
state.lock().await.clone().into_response()
}
async fn stream_only_a(request: Request) -> Response {
let state = request.state();
let broadcast = state.lock().await.clone();
broadcast.filter(|x| matches!(x, A)).into_response()
}
async fn stream_only_b(request: Request) -> Response {
let state = request.state();
let broadcast = state.lock().await.clone();
broadcast.filter(|x| matches!(x, B)).into_response()
}
#[async_std::main]
async fn main() {
let stream = BroadcastChannel::new();
let mut server = tide::with_state(Arc::new(Mutex::new(stream)));
server.at("/a").post(send_a).get(stream_only_a);
server.at("/b").post(send_b).get(stream_only_b);
server.at("/").get(stream_all);
server.listen("127.0.0.1:3131").await.unwrap();
}
// in case anyone sees this without context, a lot of this is from
// https://github.com/http-rs/async-sse/blob/master/src/encoder.rs
use async_std::{
stream::Stream,
io::{BufReader, Read as AsyncRead},
task::{ready, Context, Poll},
};
use std::{io, pin::Pin};
pin_project_lite::pin_project! {
/// An SSE protocol encoder.
#[derive(Debug, Clone)]
pub struct Encoder<S> {
buf: Option<Vec<u8>>,
#[pin]
receiver: S,
cursor: usize,
}
}
impl<E, S> AsyncRead for Encoder<S>
where
E: Event,
S: Unpin + Stream<Item = E>,
{
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
// Request a new buffer if we don't have one yet.
if let None = self.buf {
log::trace!("> waiting for event");
self.buf = match ready!(Pin::new(&mut self.receiver).poll_next(cx)) {
Some(event) => {
let encoded = encode(&event);
log::trace!("> Received a new event with len {}", encoded.len());
Some(encoded)
}
None => {
log::trace!("> Encoder done reading");
return Poll::Ready(Ok(0));
}
};
};
// Write the current buffer to completion.
let local_buf = self.buf.as_mut().unwrap();
let local_len = local_buf.len();
let max = buf.len().min(local_buf.len());
buf[..max].clone_from_slice(&local_buf[..max]);
self.cursor += max;
// Reset values if we're done reading.
if self.cursor == local_len {
self.buf = None;
self.cursor = 0;
};
// Return bytes read.
Poll::Ready(Ok(max))
}
}
pub trait Event {
fn name(&self) -> &str;
fn data(&self) -> &[u8];
fn id(&self) -> Option<&str>;
}
pub trait EventStream: Sized + Unpin + Send {
fn into_encoder(self) -> Encoder<Self>;
fn into_response(self) -> tide::Response;
}
fn encode<'a, E: Event>(event: &'a E) -> Vec<u8> {
log::trace!("> encoding event ");
let mut data = String::new();
data.push_str(&format!("event:{}\n", event.name()));
if let Some(id) = event.id() {
data.push_str(&format!("id:{}\n", id));
}
data.push_str("data:");
let mut data = data.into_bytes();
data.extend_from_slice(event.data());
data.push(b'\n');
data.push(b'\n');
data
}
use tide::IntoResponse;
impl<S, E> IntoResponse for Encoder<S>
where
S: Send + Unpin + Stream<Item = E> + 'static,
E: Event,
{
fn into_response(self) -> tide::Response {
tide::Response::with_reader(200, BufReader::new(self))
.set_header("cache-control", "no-cache")
.set_header("content-type", "text/event-stream")
}
}
impl<E: Event, S: Send + Unpin + Stream<Item = E> + 'static> EventStream for S {
fn into_encoder(self) -> Encoder<Self> {
Encoder {
receiver: self,
buf: None,
cursor: 0,
}
}
fn into_response(self) -> tide::Response {
self.into_encoder().into_response()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment