-
-
Save alexcrichton/da80683060f405d6be0e06b426588886 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// extern crate futures; | |
// extern crate futures_cpupool; | |
// extern crate tokio; | |
// extern crate tokio_io; | |
// use std::collections::HashMap; | |
// use std::iter; | |
// use std::env; | |
// use std::io::{Error, ErrorKind, BufReader}; | |
// use std::sync::{Arc, Mutex}; | |
// | |
// use futures::Future; | |
// use futures::future::Executor; | |
// use futures::stream::{self, Stream}; | |
// use futures_cpupool::CpuPool; | |
// use tokio::net::TcpListener; | |
// use tokio_io::io; | |
// use tokio_io::AsyncRead; | |
// | |
// use futures::task::Task as WakeHandle; | |
// use futures::Async; | |
mod futures { | |
extern crate futures; | |
pub mod prelude { | |
pub use super::futures::*; | |
pub use super::futures::task::Task as WakeHandle; | |
pub use super::futures::Poll as AsyncResult; | |
pub trait Task { | |
fn tick(&mut self, wake: &WakeHandle) -> Async<()>; | |
} | |
} | |
pub mod executor { | |
use super::futures::executor::CurrentThread as C; | |
use super::futures::future; | |
use super::futures::task; | |
use super::prelude::Task; | |
pub struct CurrentThread; | |
impl CurrentThread { | |
pub fn run<F, R>(f: F) -> R | |
where F: FnOnce(i32) -> R | |
{ | |
C::run(|_| f(0)) | |
} | |
pub fn execute<T: Task + 'static>(mut task: T) { | |
C::execute(future::poll_fn(move || { | |
Ok(task.tick(&task::current())) | |
})) | |
} | |
} | |
} | |
} | |
mod tokio { | |
pub mod net { | |
extern crate tokio; | |
use std::io; | |
use std::io::{Write, Read}; | |
use std::net::SocketAddr; | |
use futures::prelude::*; | |
pub struct TcpListener(tokio::net::TcpListener); | |
pub struct TcpStream(tokio::net::TcpStream); | |
impl TcpListener { | |
pub fn bind(addr: &SocketAddr) -> io::Result<TcpListener> { | |
tokio::net::TcpListener::bind(addr).map(TcpListener) | |
} | |
pub fn accept(&mut self, wake: &WakeHandle) | |
-> io::Result<(TcpStream, SocketAddr)> | |
{ | |
drop(wake); | |
self.0.accept().map(|(a, b)| (TcpStream(a), b)) | |
} | |
} | |
impl TcpStream { | |
pub fn write(&mut self, buf: &[u8], wake: &WakeHandle) | |
-> io::Result<usize> | |
{ | |
drop(wake); | |
self.0.write(buf) | |
} | |
pub fn read(&mut self, buf: &mut [u8], wake: &WakeHandle) | |
-> io::Result<usize> | |
{ | |
drop(wake); | |
self.0.read(buf) | |
} | |
} | |
} | |
} | |
use std::cell::{Cell, RefCell}; | |
use std::collections::HashMap; | |
use std::env; | |
use std::io; | |
use std::net::SocketAddr; | |
use std::rc::Rc; | |
use std::str; | |
use futures::executor::CurrentThread; | |
use futures::prelude::*; | |
use tokio::net::{TcpStream, TcpListener}; | |
// The state that we'll be using `Rc` to share between all connected clients of | |
// this server. This is what we'll use to broadcast messages from one client to | |
// all of the other connected clients. | |
#[derive(Default)] | |
struct SharedState { | |
next_id: Cell<u32>, | |
clients: RefCell<HashMap<u32, SharedClient>>, | |
} | |
struct SharedClient { | |
wake: Option<WakeHandle>, | |
message_queue: Vec<Message>, | |
} | |
#[derive(Clone)] | |
struct Message { | |
from: SocketAddr, | |
contents: String, | |
} | |
struct Server { | |
listener: TcpListener, | |
shared: Rc<SharedState>, | |
} | |
impl Task for Server { | |
fn tick(&mut self, wake: &WakeHandle) -> Async<()> { | |
// Here we infinitely accept clients from our listener and the only | |
// reason we'll stop currently is if we hit an error. | |
loop { | |
let (socket, addr) = match self.listener.accept(wake) { | |
Ok(s) => s, | |
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { | |
return Async::NotReady | |
} | |
Err(e) => panic!("error accepting a socket: {}", e), | |
}; | |
let client = Client { | |
id: self.shared.next_id.get(), | |
addr, | |
socket, | |
read_buffer: Vec::new(), | |
write_buffer: Vec::new(), | |
eof: false, | |
shared: self.shared.clone(), | |
}; | |
self.shared.next_id.set(client.id + 1); | |
self.shared.clients.borrow_mut().insert(client.id, SharedClient { | |
wake: None, | |
message_queue: Vec::new(), | |
}); | |
CurrentThread::execute(client); | |
} | |
} | |
} | |
struct Client { | |
id: u32, | |
socket: TcpStream, | |
addr: SocketAddr, | |
read_buffer: Vec<u8>, | |
write_buffer: Vec<u8>, | |
eof: bool, | |
shared: Rc<SharedState>, | |
} | |
impl Message { | |
fn render(&self, dst: &mut Vec<u8>) { | |
dst.extend(format!("{}: {}\n", self.from, self.contents).into_bytes()); | |
} | |
} | |
impl Task for Client { | |
fn tick(&mut self, wake: &WakeHandle) -> Async<()> { | |
match self.process(wake) { | |
// If we got this far then we need to see if there's more for us to | |
// do or if we're done. If we've reached eof (the read side is done) | |
// and there's nothing for us to write then we disconnect the | |
// client. Otherwise we leave it connected for future messages. | |
Ok(()) => { | |
if self.eof && self.write_buffer.len() == 0 { | |
Async::Ready(()) | |
} else { | |
Async::NotReady | |
} | |
} | |
// Any error propagating here is a fatal error, so disconnect the | |
// client by returning that we're ready | |
Err(e) => { | |
println!("disonnecting {}: {}", self.addr, e); | |
Async::Ready(()) | |
} | |
} | |
} | |
} | |
impl Client { | |
fn process(&mut self, wake: &WakeHandle) -> io::Result<()> { | |
{ | |
// First up let's make sure that if anyone sends us a message they | |
// can wake us up. | |
let mut clients = self.shared.clients.borrow_mut(); | |
let me = clients.get_mut(&self.id).unwrap(); | |
me.wake = Some(wake.clone()); | |
// Next let's take a loook at any messages others may have sent us, | |
// appending them to our write buffer which we'll process below. | |
for message in me.message_queue.drain(..) { | |
message.render(&mut self.write_buffer); | |
} | |
} | |
// Next do as much I/O as we can for this client. We'll both write | |
// out all pending data on our client and also read as much data as we | |
// can. After doing both of these operations we'll only continue if we | |
// didn't hit a non-fatal I/O error. | |
if let Err(e) = self.write_buffer(wake).and(self.read_buffer(wake)) { | |
if e.kind() != io::ErrorKind::WouldBlock { | |
return Err(e) | |
} | |
} | |
// If we've got data we've read from the client, look for some messages | |
// to broadcast... | |
while self.read_buffer.len() > 0 { | |
// Try to find our delimiter, a newline | |
let i = match self.read_buffer.iter().position(|&b| b == b'\n') { | |
Some(i) => i, | |
None => break, | |
}; | |
// If we found a delimiter, try to convert that slice to a string. | |
// Regardless remove the bytes we're currently processing. | |
let contents = str::from_utf8(&self.read_buffer[..i]) | |
.map(|s| s.trim().to_string()); | |
self.read_buffer.drain(..i + 1); // +1 for the newline as well | |
// If the messages was actually valid utf-8 we broadcast it to | |
// everyone, otherwise we silently discard it for now. | |
if let Ok(contents) = contents { | |
let message = Message { from: self.addr, contents }; | |
let mut clients = self.shared.clients.borrow_mut(); | |
for (&id, client) in clients.iter_mut() { | |
if id == self.id { | |
continue | |
} | |
client.message_queue.push(message.clone()); | |
if let Some(wake) = client.wake.take() { | |
wake.notify(); | |
} | |
} | |
} | |
} | |
// Alright after all that we're good go go! Let's wait for the next | |
// message. | |
Ok(()) | |
} | |
fn write_buffer(&mut self, wake: &WakeHandle) -> io::Result<()> { | |
while self.write_buffer.len() > 0 { | |
let n = self.socket.write(&self.write_buffer, wake)?; | |
self.write_buffer.drain(..n); | |
} | |
Ok(()) | |
} | |
fn read_buffer(&mut self, wake: &WakeHandle) -> io::Result<()> { | |
while !self.eof { | |
self.eof = self.read(wake)? == 0; | |
} | |
Ok(()) | |
} | |
fn read(&mut self, wake: &WakeHandle) -> io::Result<usize> { | |
match self.socket.read(unsafe { slice_to_end(&mut self.read_buffer) }, wake)? { | |
0 => return Ok(0), | |
n => { | |
unsafe { | |
let len = self.read_buffer.len(); | |
self.read_buffer.set_len(len + n); | |
} | |
return Ok(n) | |
} | |
} | |
unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] { | |
use std::slice; | |
if v.capacity() == 0 { | |
v.reserve(16); | |
} | |
if v.capacity() == v.len() { | |
v.reserve(1); | |
} | |
slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize), | |
v.capacity() - v.len()) | |
} | |
} | |
} | |
impl Drop for Client { | |
fn drop(&mut self) { | |
// When our client is done and the future is deallocate be sure to | |
// unregister ourselves from the broadcast queue as we'll no longer be | |
// processing any of those messages! | |
let mut clients = self.shared.clients.borrow_mut(); | |
clients.remove(&self.id); | |
} | |
} | |
fn main() { | |
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string()); | |
let addr = addr.parse().unwrap(); | |
let shared = Rc::default(); | |
// Create the TCP listener we'll accept connections on. | |
let server = Server { | |
listener: TcpListener::bind(&addr).unwrap(), | |
shared: shared, | |
}; | |
println!("Listening on: {}", addr); | |
CurrentThread::run(|_| { | |
CurrentThread::execute(server); | |
}); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment