Skip to content

Instantly share code, notes, and snippets.

@codesections
Last active May 16, 2022 23:48
Show Gist options
  • Save codesections/7827e780876f7cdfd233e2570f9dec2c to your computer and use it in GitHub Desktop.
Save codesections/7827e780876f7cdfd233e2570f9dec2c to your computer and use it in GitHub Desktop.
Warp_proof_of_concept
use futures::{Async, Future, Poll};
use tokio::io::{AsyncRead, AsyncWrite, Error, ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use warp::{path, Filter, Stream};
struct Receiver {
rx: ReadHalf<TcpStream>,
}
impl Stream for Receiver {
type Item = String;
type Error = Error;
fn poll(&mut self) -> Poll<Option<String>, Self::Error> {
let mut buffer = vec![0u8; 1000];
while let Async::Ready(num_bytes_read) = self.rx.poll_read(&mut buffer)? {
return Ok(Async::Ready(Some(
String::from_utf8_lossy(&buffer[..num_bytes_read]).to_string(),
)));
}
return Ok(Async::NotReady);
}
}
struct Sender {
tx: WriteHalf<TcpStream>,
}
impl Future for Sender {
type Item = ();
type Error = Box<Error>;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let subscribe_cmd = "*2\r\n$9\r\nsubscribe\r\n$15\r\ntimeline:public\r\n";
let mut buffer = subscribe_cmd.as_bytes();
self.tx.poll_write(&mut buffer)?;
return Ok(Async::NotReady);
}
}
fn get_socket() -> impl Future<Item = TcpStream, Error = ()> {
let address = "127.0.0.1:6379".parse().expect("Unable to parse address");
let connection = TcpStream::connect(&address);
connection
.and_then(|socket| Ok(socket))
.map_err(|e| eprintln!("{}", e))
}
fn send_subscribe_cmd(tx: WriteHalf<TcpStream>) {
let sender = Sender { tx };
tokio::spawn(sender.map_err(|e| eprintln!("{}", e)));
}
fn main() {
let routes = warp::path!("api" / "v1" / "streaming" / "public")
.map(|| {
let socket_fut = get_socket();
socket_fut
.and_then(|socket| {
let (rx, tx) = socket.split();
send_subscribe_cmd(tx);
let stream_of_data_from_redis = Receiver { rx };
Ok(stream_of_data_from_redis)
})
.and_then(|stream| Ok(stream))
.map_err(|_| warp::reject::not_found())
})
.and_then(|event_stream| event_stream)
.and(warp::sse())
.map(|event_stream: Receiver, sse: warp::sse::Sse| {
sse.reply(event_stream.map(|item| warp::sse::data(item)))
});
warp::serve(routes).run(([127, 0, 0, 1], 3030));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment