Skip to content

Instantly share code, notes, and snippets.

@nodef0
Created August 17, 2019 22:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nodef0/6bd203ffb549433b78e30de5c5f98731 to your computer and use it in GitHub Desktop.
Save nodef0/6bd203ffb549433b78e30de5c5f98731 to your computer and use it in GitHub Desktop.
// Example of merging two async streams of different types into one
// Using websocket's example/async-server.rs as basis
use websocket::message::{Message, OwnedMessage};
use websocket::r#async::Server;
use websocket::server::InvalidConnection;
use futures::{future, Future, Sink, Stream};
use tokio::runtime::TaskExecutor;
use core::fmt::Debug;
use std::time::{Duration, Instant};
use tokio::timer::Interval;
use websocket::result::WebSocketError;
enum MuxEvent {
Tick,
Socket(OwnedMessage),
}
#[derive(Debug)]
enum MuxErr {
Timer,
Socket(WebSocketError),
}
impl From<WebSocketError> for MuxErr {
fn from(error: WebSocketError) -> Self {
MuxErr::Socket(error)
}
}
fn main() {
let mut runtime = tokio::runtime::Builder::new().build().unwrap();
let executor = runtime.executor();
// bind to the server
let server = Server::bind("localhost:2794", &tokio::reactor::Handle::default()).unwrap();
// time to build the server's future
// this will be a struct containing everything the server is going to do
// a stream of incoming connections
let f = server
.incoming()
.then(future::ok) // wrap good and bad events into future::ok
.filter(|event| {
match event {
Ok(_) => true, // a good connection
Err(InvalidConnection { ref error, .. }) => {
println!("Bad client: {}", error);
false // we want to save the stream if a client cannot make a valid handshake
}
}
})
.and_then(|event| event) // unwrap good connections
.map_err(|_| ()) // and silently ignore errors (in `.filter`)
.for_each(move |(upgrade, addr)| {
println!("Got a connection from: {}", addr);
// check if it has the protocol we want
if !upgrade.protocols().iter().any(|s| s == "rust-websocket") {
// reject it if it doesn't
spawn_future(upgrade.reject(), "Upgrade Rejection", &executor);
return Ok(());
}
// accept the request to be a ws connection if it does
let f = upgrade
.use_protocol("rust-websocket")
.accept()
// send a greeting!
.and_then(|(s, _)| s.send(Message::text("Hello World!").into()))
// wrap into the common type
.map_err(MuxErr::Socket)
// simple echo server impl
.and_then(|s| {
let events = s.map(MuxEvent::Socket).map_err(MuxErr::Socket);
// create a periodic timer for the connection
// and map the event stream and the timer to a common Stream Item | Error
let periodic = Interval::new(Instant::now(), Duration::from_millis(500))
.map(|_| MuxEvent::Tick)
.map_err(|_| MuxErr::Timer);
let (sink, stream) = events.split();
// transform the sink's error type to MuxErr
// in order to use .forward()
let sink_mux = sink.sink_from_err::<MuxErr>();
stream
.select(periodic)
.take_while(|m| match m {
MuxEvent::Socket(m) => Ok(!m.is_close()),
_ => Ok(true),
})
.filter_map(move |m| match m {
MuxEvent::Socket(m) => {
println!("Message from Client: {:?}", m);
match m {
OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)),
OwnedMessage::Pong(_) => None,
_ => Some(m),
}
}
MuxEvent::Tick => {
println!("Tick");
None
}
})
.forward(sink_mux)
.and_then(|(_, sink)| sink.send(OwnedMessage::Close(None)))
});
spawn_future(f, "Client Status", &executor);
Ok(())
});
runtime.block_on(f).unwrap();
}
fn spawn_future<F, I, E>(f: F, desc: &'static str, executor: &TaskExecutor)
where
F: Future<Item = I, Error = E> + 'static + Send,
E: Debug,
{
executor.spawn(
f.map_err(move |e| println!("{}: '{:?}'", desc, e))
.map(move |_| println!("{}: Finished.", desc)),
);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment