Skip to content

Instantly share code, notes, and snippets.

@codesections
Last active November 4, 2021 10:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save codesections/e3adcf0984ae359e3d62ff2dca7353fe to your computer and use it in GitHub Desktop.
Save codesections/e3adcf0984ae359e3d62ff2dca7353fe to your computer and use it in GitHub Desktop.
A proof of concept showing streaming from a Redis pub/sub channel to Server Sent Events
use futures::{Async, Poll};
use redis;
use std::time::Duration;
use warp::{path, Filter, Stream};
struct OutputStream {
con: redis::Connection,
}
impl OutputStream {
fn new() -> Self {
let client = redis::Client::open("redis://127.0.0.1:6379").unwrap();
let con = client.get_connection().unwrap();
OutputStream { con }
}
}
impl Stream for OutputStream {
type Item = String;
type Error = std::io::Error;
fn poll(&mut self) -> Poll<Option<String>, std::io::Error> {
let mut pubsub = self.con.as_pubsub();
pubsub.subscribe("timeline:1").unwrap();
pubsub.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
match pubsub.get_message() {
Err(_) => Ok(Async::NotReady),
Ok(msg) => Ok(Async::Ready(Some(msg.get_payload().unwrap()))),
}
}
}
fn main() {
let routes = warp::path!("api" / "v1" / "streaming" / "public")
.and(warp::sse())
.map(|sse: warp::sse::Sse| {
let stream = OutputStream::new().inspect(|_| {});
sse.reply(warp::sse::keep(
stream.map(|s| warp::sse::data(s)),
Some(Duration::new(1, 0)),
))
});
warp::serve(routes).run(([127, 0, 0, 1], 3030));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment