Created
October 29, 2018 05:39
-
-
Save Leonti/35297de9bb6fb6c398d443684bf932c6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! A chat server that broadcasts a message to all connections. | |
//! | |
//! This example is explicitly more verbose than it has to be. This is to | |
//! illustrate more concepts. | |
//! | |
//! A chat server for telnet clients. After a telnet client connects, the first | |
//! line should contain the client's name. After that, all lines sent by a | |
//! client are broadcasted to all other connected clients. | |
//! | |
//! Because the client is telnet, lines are delimited by "\r\n". | |
//! | |
//! You can test this out by running: | |
//! | |
//! cargo run --example chat | |
//! | |
//! And then in another terminal run: | |
//! | |
//! telnet localhost 6142 | |
//! | |
//! You can run the `telnet` command in any number of additional windows. | |
//! | |
//! You can run the second command in multiple windows and then chat between the | |
//! two, seeing the messages from the other client as they're received. For all | |
//! connected clients they'll all join the same room and see everyone else's | |
//! messages. | |
#![deny(warnings)] | |
extern crate tokio; | |
#[macro_use] | |
extern crate futures; | |
extern crate bytes; | |
use tokio::io; | |
use tokio::net::{TcpListener, TcpStream}; | |
use tokio::prelude::*; | |
use futures::sync::mpsc; | |
//use futures::future::{self, Either}; | |
use bytes::{BytesMut, Bytes, BufMut}; | |
use std::collections::HashMap; | |
use std::net::SocketAddr; | |
use std::sync::{Arc, Mutex}; | |
/// Shorthand for the transmit half of the message channel. | |
type Tx = mpsc::UnboundedSender<Bytes>; | |
/// Shorthand for the receive half of the message channel. | |
type Rx = mpsc::UnboundedReceiver<Bytes>; | |
/// Data that is shared between all peers in the chat server. | |
/// | |
/// This is the set of `Tx` handles for all connected clients. Whenever a | |
/// message is received from a client, it is broadcasted to all peers by | |
/// iterating over the `peers` entries and sending a copy of the message on each | |
/// `Tx`. | |
struct Shared { | |
clients: HashMap<SocketAddr, Tx>, | |
server_tx: Tx | |
} | |
/// The state for each connected client. | |
struct Client { | |
/// The TCP socket wrapped with the `Lines` codec, defined below. | |
/// | |
/// This handles sending and receiving data on the socket. When using | |
/// `Lines`, we can work at the line level instead of having to manage the | |
/// raw byte operations. | |
lines: Lines, | |
/// Handle to the shared chat state. | |
/// | |
/// This is used to broadcast messages read off the socket to all connected | |
/// peers. | |
state: Arc<Mutex<Shared>>, | |
/// Receive half of the message channel. | |
/// | |
/// This is used to receive messages from peers. When a message is received | |
/// off of this `Rx`, it will be written to the socket. | |
rx: Rx, | |
/// Client socket address. | |
/// | |
/// The socket address is used as the key in the `peers` HashMap. The | |
/// address is saved so that the `Peer` drop implementation can clean up its | |
/// entry. | |
addr: SocketAddr, | |
} | |
/// Line based codec | |
/// | |
/// This decorates a socket and presents a line based read / write interface. | |
/// | |
/// As a user of `Lines`, we can focus on working at the line level. So, we send | |
/// and receive values that represent entire lines. The `Lines` codec will | |
/// handle the encoding and decoding as well as reading from and writing to the | |
/// socket. | |
#[derive(Debug)] | |
struct Lines { | |
/// The TCP socket. | |
socket: TcpStream, | |
/// Buffer used when reading from the socket. Data is not returned from this | |
/// buffer until an entire line has been read. | |
rd: BytesMut, | |
/// Buffer used to stage data before writing it to the socket. | |
wr: BytesMut, | |
} | |
impl Shared { | |
/// Create a new, empty, instance of `Shared`. | |
fn new(server_tx: Tx) -> Self { | |
Shared { | |
clients: HashMap::new(), | |
server_tx | |
} | |
} | |
} | |
impl Client { | |
/// Create a new instance of `Peer`. | |
fn new(state: Arc<Mutex<Shared>>, | |
lines: Lines) -> Client | |
{ | |
// Get the client socket address | |
let addr = lines.socket.peer_addr().unwrap(); | |
// Create a channel for this peer | |
let (tx, rx) = mpsc::unbounded(); | |
// Add an entry for this `Peer` in the shared state map. | |
state.lock().unwrap() | |
.clients.insert(addr, tx); | |
Client { | |
lines, | |
state, | |
rx, | |
addr, | |
} | |
} | |
} | |
/// This is where a connected client is managed. | |
/// | |
/// A `Peer` is also a future representing completely processing the client. | |
/// | |
/// When a `Peer` is created, the first line (representing the client's name) | |
/// has already been read. When the socket closes, the `Peer` future completes. | |
/// | |
/// While processing, the peer future implementation will: | |
/// | |
/// 1) Receive messages on its message channel and write them to the socket. | |
/// 2) Receive messages from the socket and broadcast them to all peers. | |
/// | |
impl Future for Client { | |
type Item = (); | |
type Error = io::Error; | |
fn poll(&mut self) -> Poll<(), io::Error> { | |
// Tokio (and futures) use cooperative scheduling without any | |
// preemption. If a task never yields execution back to the executor, | |
// then other tasks may be starved. | |
// | |
// To deal with this, robust applications should not have any unbounded | |
// loops. In this example, we will read at most `LINES_PER_TICK` lines | |
// from the client on each tick. | |
// | |
// If the limit is hit, the current task is notified, informing the | |
// executor to schedule the task again asap. | |
const LINES_PER_TICK: usize = 10; | |
// Receive all messages from peers. | |
for i in 0..LINES_PER_TICK { | |
// Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is | |
// safe. | |
match self.rx.poll().unwrap() { | |
Async::Ready(Some(v)) => { | |
// Buffer the line. Once all lines are buffered, they will | |
// be flushed to the socket (right below). | |
self.lines.buffer(&v); | |
// If this is the last iteration, the loop will break even | |
// though there could still be lines to read. Because we did | |
// not reach `Async::NotReady`, we have to notify ourselves | |
// in order to tell the executor to schedule the task again. | |
if i+1 == LINES_PER_TICK { | |
task::current().notify(); | |
} | |
} | |
_ => break, | |
} | |
} | |
// Flush the write buffer to the socket | |
let _ = self.lines.poll_flush()?; | |
// Read new lines from the socket | |
while let Async::Ready(line) = self.lines.poll()? { | |
println!("Received line {:?}", line); | |
if let Some(message) = line { | |
// Append the peer's name to the front of the line: | |
let mut line = BytesMut::new(); | |
line.extend_from_slice(&message); | |
line.extend_from_slice(b"\r\n"); | |
// We're using `Bytes`, which allows zero-copy clones (by | |
// storing the data in an Arc internally). | |
// | |
// However, before cloning, we must freeze the data. This | |
// converts it from mutable -> immutable, allowing zero copy | |
// cloning. | |
let line = line.freeze(); | |
// Now, send the line to all other peers | |
for (addr, tx) in &self.state.lock().unwrap().clients { | |
// Don't send the message to ourselves | |
if *addr != self.addr { | |
// The send only fails if the rx half has been dropped, | |
// however this is impossible as the `tx` half will be | |
// removed from the map before the `rx` is dropped. | |
tx.unbounded_send(line.clone()).unwrap(); | |
} | |
} | |
let is_closed = &self.state.lock().unwrap().server_tx.is_closed(); | |
println!("Is closed {:?}", is_closed); | |
match &self.state.lock().unwrap().server_tx.unbounded_send(line.clone()) { | |
Ok(_) => println!("Message sent"), | |
Err(e) => println!("send error = {:?}", e) | |
} | |
} else { | |
// EOF was reached. The remote client has disconnected. There is | |
// nothing more to do. | |
return Ok(Async::Ready(())); | |
} | |
} | |
// As always, it is important to not just return `NotReady` without | |
// ensuring an inner future also returned `NotReady`. | |
// | |
// We know we got a `NotReady` from either `self.rx` or `self.lines`, so | |
// the contract is respected. | |
Ok(Async::NotReady) | |
} | |
} | |
impl Drop for Client { | |
fn drop(&mut self) { | |
self.state.lock().unwrap().clients | |
.remove(&self.addr); | |
} | |
} | |
impl Lines { | |
/// Create a new `Lines` codec backed by the socket | |
fn new(socket: TcpStream) -> Self { | |
Lines { | |
socket, | |
rd: BytesMut::new(), | |
wr: BytesMut::new(), | |
} | |
} | |
/// Buffer a line. | |
/// | |
/// This writes the line to an internal buffer. Calls to `poll_flush` will | |
/// attempt to flush this buffer to the socket. | |
fn buffer(&mut self, line: &[u8]) { | |
// Ensure the buffer has capacity. Ideally this would not be unbounded, | |
// but to keep the example simple, we will not limit this. | |
self.wr.reserve(line.len()); | |
// Push the line onto the end of the write buffer. | |
// | |
// The `put` function is from the `BufMut` trait. | |
self.wr.put(line); | |
} | |
/// Flush the write buffer to the socket | |
fn poll_flush(&mut self) -> Poll<(), io::Error> { | |
// As long as there is buffered data to write, try to write it. | |
while !self.wr.is_empty() { | |
// Try to write some bytes to the socket | |
let n = try_ready!(self.socket.poll_write(&self.wr)); | |
// As long as the wr is not empty, a successful write should | |
// never write 0 bytes. | |
assert!(n > 0); | |
// This discards the first `n` bytes of the buffer. | |
let _ = self.wr.split_to(n); | |
} | |
Ok(Async::Ready(())) | |
} | |
/// Read data from the socket. | |
/// | |
/// This only returns `Ready` when the socket has closed. | |
fn fill_read_buf(&mut self) -> Poll<(), io::Error> { | |
loop { | |
// Ensure the read buffer has capacity. | |
// | |
// This might result in an internal allocation. | |
self.rd.reserve(1024); | |
// Read data into the buffer. | |
let n = try_ready!(self.socket.read_buf(&mut self.rd)); | |
if n == 0 { | |
return Ok(Async::Ready(())); | |
} | |
} | |
} | |
} | |
impl Stream for Lines { | |
type Item = BytesMut; | |
type Error = io::Error; | |
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |
// First, read any new data that might have been received off the socket | |
let sock_closed = self.fill_read_buf()?.is_ready(); | |
// Now, try finding lines | |
let pos = self.rd.windows(2).enumerate() | |
.find(|&(_, bytes)| bytes == b"\r\n") | |
.map(|(i, _)| i); | |
if let Some(pos) = pos { | |
// Remove the line from the read buffer and set it to `line`. | |
let mut line = self.rd.split_to(pos + 2); | |
// Drop the trailing \r\n | |
line.split_off(pos); | |
// Return the line | |
return Ok(Async::Ready(Some(line))); | |
} | |
if sock_closed { | |
Ok(Async::Ready(None)) | |
} else { | |
Ok(Async::NotReady) | |
} | |
} | |
} | |
/// Spawn a task to manage the socket. | |
/// | |
/// This will read the first line from the socket to identify the client, then | |
/// add the client to the set of connected peers in the chat service. | |
fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) { | |
// Wrap the socket with the `Lines` codec that we wrote above. | |
// | |
// By doing this, we can operate at the line level instead of doing raw byte | |
// manipulation. | |
let lines = Lines::new(socket); | |
let peer = Client::new( | |
state, | |
lines).map_err(|e| { | |
println!("connection error = {:?}", e); | |
}); | |
// Spawn the task. Internally, this submits the task to a thread pool. | |
tokio::spawn(peer); | |
} | |
pub fn main() { | |
let (server_tx, server_rx) = mpsc::unbounded(); | |
let state = Arc::new(Mutex::new(Shared::new(server_tx))); | |
let addr = "127.0.0.1:6142".parse().unwrap(); | |
let listener = TcpListener::bind(&addr).unwrap(); | |
let server = listener.incoming().for_each(move |socket| { | |
// Spawn a task to process the connection | |
process(socket, state.clone()); | |
Ok(()) | |
}).map_err(|err| { | |
// All tasks must have an `Error` type of `()`. This forces error | |
// handling and helps avoid silencing failures. | |
// | |
// In our example, we are only going to log the error to STDOUT. | |
println!("accept error = {:?}", err); | |
}); | |
println!("server running on localhost:6142"); | |
let _messages = server_rx.for_each(|_| { | |
// process messages here | |
Ok(()) | |
}).map_err(|err| { | |
// All tasks must have an `Error` type of `()`. This forces error | |
// handling and helps avoid silencing failures. | |
// | |
// In our example, we are only going to log the error to STDOUT. | |
println!("accept error = {:?}", err); | |
}); | |
tokio::run(server); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment