Skip to content

Instantly share code, notes, and snippets.

@sangelxyz
Last active March 25, 2024 01:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sangelxyz/fe47e931f3536289a798eea7b5d21184 to your computer and use it in GitHub Desktop.
Save sangelxyz/fe47e931f3536289a798eea7b5d21184 to your computer and use it in GitHub Desktop.
// Server-Sent Events (SSE) with tokio broadcast channel
// Update: removed arc/mutex.
use async_stream::stream;
use axum:: {
extract::State, response::{sse::{Event, KeepAlive, Sse}, IntoResponse}, routing::get, Json, Router
};
use std::{
convert::Infallible, sync::Arc, time::Duration
};
use futures::stream::Stream;
use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use serde::Serialize;
use tokio::sync::Mutex;
#[tokio::main]
async fn main() {
let (tx, _rx) = broadcast::channel(10);
let wrapped_rx = tx.clone();
let app = Router::new()
.route("/", get(handler))
.route("/sse/", get(sse_handler))
.with_state(wrapped_rx);
tokio::spawn(async move {
let mut count = 0;
loop {
tx.send(format!("SSE event {}", count)).unwrap();
count += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn sse_handler(State(tx): State<Sender<String>>) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
// broadcast::Receiver
let mut rx = tx.subscribe();
Sse::new(stream! {
while let Ok(msg) = rx.recv().await {
yield Ok(Event::default().data::<String>(msg));
}
}).keep_alive(KeepAlive::default())
}
async fn handler() -> impl IntoResponse {
let body = "Hello, world!";
Json(body)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment