Skip to content

Instantly share code, notes, and snippets.

@sangelxyz
Created March 25, 2024 04:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sangelxyz/1896431bf005ea93625c37a698518eac to your computer and use it in GitHub Desktop.
Save sangelxyz/1896431bf005ea93625c37a698518eac to your computer and use it in GitHub Desktop.
server-sent-events-cors
// Server-Sent Events (SSE) with tokio broadcast channel - with cors
// Note: seems to be cors error still while connecting.
use async_stream::stream;
use axum:: {
extract::State, response::{sse::{Event, KeepAlive, Sse}, IntoResponse}, routing::get, Json, Router
};
use std::{
convert::Infallible, sync::Arc, time::Duration
};
use futures::stream::Stream;
use tokio::sync::broadcast;
use tokio::sync::broadcast::Sender;
use serde::Serialize;
use tokio::sync::Mutex;
use tower_http::cors::{Any, CorsLayer};
#[tokio::main]
async fn main() {
let (tx, _rx) = broadcast::channel(10);
let wrapped_rx = tx.clone();
let cors = CorsLayer::new()
.allow_methods([Method::GET])
.allow_origin(Any);
let app = Router::new()
.route("/", get(sse_handler))
.layer(cors)
.with_state(wrapped_rx);
tokio::spawn(async move {
let mut count = 0;
loop {
tx.send(format!("SSE event {}", count)).unwrap();
count += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
async fn sse_handler(State(tx): State<Sender<String>>) -> Sse<impl Stream<Item = Result<Event, Infallible>>> {
// broadcast::Receiver
let mut rx = tx.subscribe();
Sse::new(stream! {
while let Ok(msg) = rx.recv().await {
yield Ok(Event::default().data::<String>(msg));
}
}).keep_alive(KeepAlive::default())
}
async fn handler() -> impl IntoResponse {
let body = "Hello, world!";
Json(body)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment