Created
August 17, 2019 22:16
-
-
Save nodef0/6bd203ffb549433b78e30de5c5f98731 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Example of merging two async streams of different types into one | |
// Using websocket's example/async-server.rs as basis | |
use websocket::message::{Message, OwnedMessage}; | |
use websocket::r#async::Server; | |
use websocket::server::InvalidConnection; | |
use futures::{future, Future, Sink, Stream}; | |
use tokio::runtime::TaskExecutor; | |
use core::fmt::Debug; | |
use std::time::{Duration, Instant}; | |
use tokio::timer::Interval; | |
use websocket::result::WebSocketError; | |
enum MuxEvent { | |
Tick, | |
Socket(OwnedMessage), | |
} | |
#[derive(Debug)] | |
enum MuxErr { | |
Timer, | |
Socket(WebSocketError), | |
} | |
impl From<WebSocketError> for MuxErr { | |
fn from(error: WebSocketError) -> Self { | |
MuxErr::Socket(error) | |
} | |
} | |
fn main() { | |
let mut runtime = tokio::runtime::Builder::new().build().unwrap(); | |
let executor = runtime.executor(); | |
// bind to the server | |
let server = Server::bind("localhost:2794", &tokio::reactor::Handle::default()).unwrap(); | |
// time to build the server's future | |
// this will be a struct containing everything the server is going to do | |
// a stream of incoming connections | |
let f = server | |
.incoming() | |
.then(future::ok) // wrap good and bad events into future::ok | |
.filter(|event| { | |
match event { | |
Ok(_) => true, // a good connection | |
Err(InvalidConnection { ref error, .. }) => { | |
println!("Bad client: {}", error); | |
false // we want to save the stream if a client cannot make a valid handshake | |
} | |
} | |
}) | |
.and_then(|event| event) // unwrap good connections | |
.map_err(|_| ()) // and silently ignore errors (in `.filter`) | |
.for_each(move |(upgrade, addr)| { | |
println!("Got a connection from: {}", addr); | |
// check if it has the protocol we want | |
if !upgrade.protocols().iter().any(|s| s == "rust-websocket") { | |
// reject it if it doesn't | |
spawn_future(upgrade.reject(), "Upgrade Rejection", &executor); | |
return Ok(()); | |
} | |
// accept the request to be a ws connection if it does | |
let f = upgrade | |
.use_protocol("rust-websocket") | |
.accept() | |
// send a greeting! | |
.and_then(|(s, _)| s.send(Message::text("Hello World!").into())) | |
// wrap into the common type | |
.map_err(MuxErr::Socket) | |
// simple echo server impl | |
.and_then(|s| { | |
let events = s.map(MuxEvent::Socket).map_err(MuxErr::Socket); | |
// create a periodic timer for the connection | |
// and map the event stream and the timer to a common Stream Item | Error | |
let periodic = Interval::new(Instant::now(), Duration::from_millis(500)) | |
.map(|_| MuxEvent::Tick) | |
.map_err(|_| MuxErr::Timer); | |
let (sink, stream) = events.split(); | |
// transform the sink's error type to MuxErr | |
// in order to use .forward() | |
let sink_mux = sink.sink_from_err::<MuxErr>(); | |
stream | |
.select(periodic) | |
.take_while(|m| match m { | |
MuxEvent::Socket(m) => Ok(!m.is_close()), | |
_ => Ok(true), | |
}) | |
.filter_map(move |m| match m { | |
MuxEvent::Socket(m) => { | |
println!("Message from Client: {:?}", m); | |
match m { | |
OwnedMessage::Ping(p) => Some(OwnedMessage::Pong(p)), | |
OwnedMessage::Pong(_) => None, | |
_ => Some(m), | |
} | |
} | |
MuxEvent::Tick => { | |
println!("Tick"); | |
None | |
} | |
}) | |
.forward(sink_mux) | |
.and_then(|(_, sink)| sink.send(OwnedMessage::Close(None))) | |
}); | |
spawn_future(f, "Client Status", &executor); | |
Ok(()) | |
}); | |
runtime.block_on(f).unwrap(); | |
} | |
fn spawn_future<F, I, E>(f: F, desc: &'static str, executor: &TaskExecutor) | |
where | |
F: Future<Item = I, Error = E> + 'static + Send, | |
E: Debug, | |
{ | |
executor.spawn( | |
f.map_err(move |e| println!("{}: '{:?}'", desc, e)) | |
.map(move |_| println!("{}: Finished.", desc)), | |
); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment