Skip to content

Instantly share code, notes, and snippets.

@alexcrichton
Last active December 7, 2017 23:30
Show Gist options
  • Save alexcrichton/da80683060f405d6be0e06b426588886 to your computer and use it in GitHub Desktop.
Save alexcrichton/da80683060f405d6be0e06b426588886 to your computer and use it in GitHub Desktop.
// extern crate futures;
// extern crate futures_cpupool;
// extern crate tokio;
// extern crate tokio_io;
// use std::collections::HashMap;
// use std::iter;
// use std::env;
// use std::io::{Error, ErrorKind, BufReader};
// use std::sync::{Arc, Mutex};
//
// use futures::Future;
// use futures::future::Executor;
// use futures::stream::{self, Stream};
// use futures_cpupool::CpuPool;
// use tokio::net::TcpListener;
// use tokio_io::io;
// use tokio_io::AsyncRead;
//
// use futures::task::Task as WakeHandle;
// use futures::Async;
mod futures {
extern crate futures;
pub mod prelude {
pub use super::futures::*;
pub use super::futures::task::Task as WakeHandle;
pub use super::futures::Poll as AsyncResult;
pub trait Task {
fn tick(&mut self, wake: &WakeHandle) -> Async<()>;
}
}
pub mod executor {
use super::futures::executor::CurrentThread as C;
use super::futures::future;
use super::futures::task;
use super::prelude::Task;
pub struct CurrentThread;
impl CurrentThread {
pub fn run<F, R>(f: F) -> R
where F: FnOnce(i32) -> R
{
C::run(|_| f(0))
}
pub fn execute<T: Task + 'static>(mut task: T) {
C::execute(future::poll_fn(move || {
Ok(task.tick(&task::current()))
}))
}
}
}
}
mod tokio {
pub mod net {
extern crate tokio;
use std::io;
use std::io::{Write, Read};
use std::net::SocketAddr;
use futures::prelude::*;
pub struct TcpListener(tokio::net::TcpListener);
pub struct TcpStream(tokio::net::TcpStream);
impl TcpListener {
pub fn bind(addr: &SocketAddr) -> io::Result<TcpListener> {
tokio::net::TcpListener::bind(addr).map(TcpListener)
}
pub fn accept(&mut self, wake: &WakeHandle)
-> io::Result<(TcpStream, SocketAddr)>
{
drop(wake);
self.0.accept().map(|(a, b)| (TcpStream(a), b))
}
}
impl TcpStream {
pub fn write(&mut self, buf: &[u8], wake: &WakeHandle)
-> io::Result<usize>
{
drop(wake);
self.0.write(buf)
}
pub fn read(&mut self, buf: &mut [u8], wake: &WakeHandle)
-> io::Result<usize>
{
drop(wake);
self.0.read(buf)
}
}
}
}
use std::cell::{Cell, RefCell};
use std::collections::HashMap;
use std::env;
use std::io;
use std::net::SocketAddr;
use std::rc::Rc;
use std::str;
use futures::executor::CurrentThread;
use futures::prelude::*;
use tokio::net::{TcpStream, TcpListener};
// The state that we'll be using `Rc` to share between all connected clients of
// this server. This is what we'll use to broadcast messages from one client to
// all of the other connected clients.
#[derive(Default)]
struct SharedState {
next_id: Cell<u32>,
clients: RefCell<HashMap<u32, SharedClient>>,
}
struct SharedClient {
wake: Option<WakeHandle>,
message_queue: Vec<Message>,
}
#[derive(Clone)]
struct Message {
from: SocketAddr,
contents: String,
}
struct Server {
listener: TcpListener,
shared: Rc<SharedState>,
}
impl Task for Server {
fn tick(&mut self, wake: &WakeHandle) -> Async<()> {
// Here we infinitely accept clients from our listener and the only
// reason we'll stop currently is if we hit an error.
loop {
let (socket, addr) = match self.listener.accept(wake) {
Ok(s) => s,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return Async::NotReady
}
Err(e) => panic!("error accepting a socket: {}", e),
};
let client = Client {
id: self.shared.next_id.get(),
addr,
socket,
read_buffer: Vec::new(),
write_buffer: Vec::new(),
eof: false,
shared: self.shared.clone(),
};
self.shared.next_id.set(client.id + 1);
self.shared.clients.borrow_mut().insert(client.id, SharedClient {
wake: None,
message_queue: Vec::new(),
});
CurrentThread::execute(client);
}
}
}
struct Client {
id: u32,
socket: TcpStream,
addr: SocketAddr,
read_buffer: Vec<u8>,
write_buffer: Vec<u8>,
eof: bool,
shared: Rc<SharedState>,
}
impl Message {
fn render(&self, dst: &mut Vec<u8>) {
dst.extend(format!("{}: {}\n", self.from, self.contents).into_bytes());
}
}
impl Task for Client {
fn tick(&mut self, wake: &WakeHandle) -> Async<()> {
match self.process(wake) {
// If we got this far then we need to see if there's more for us to
// do or if we're done. If we've reached eof (the read side is done)
// and there's nothing for us to write then we disconnect the
// client. Otherwise we leave it connected for future messages.
Ok(()) => {
if self.eof && self.write_buffer.len() == 0 {
Async::Ready(())
} else {
Async::NotReady
}
}
// Any error propagating here is a fatal error, so disconnect the
// client by returning that we're ready
Err(e) => {
println!("disonnecting {}: {}", self.addr, e);
Async::Ready(())
}
}
}
}
impl Client {
fn process(&mut self, wake: &WakeHandle) -> io::Result<()> {
{
// First up let's make sure that if anyone sends us a message they
// can wake us up.
let mut clients = self.shared.clients.borrow_mut();
let me = clients.get_mut(&self.id).unwrap();
me.wake = Some(wake.clone());
// Next let's take a loook at any messages others may have sent us,
// appending them to our write buffer which we'll process below.
for message in me.message_queue.drain(..) {
message.render(&mut self.write_buffer);
}
}
// Next do as much I/O as we can for this client. We'll both write
// out all pending data on our client and also read as much data as we
// can. After doing both of these operations we'll only continue if we
// didn't hit a non-fatal I/O error.
if let Err(e) = self.write_buffer(wake).and(self.read_buffer(wake)) {
if e.kind() != io::ErrorKind::WouldBlock {
return Err(e)
}
}
// If we've got data we've read from the client, look for some messages
// to broadcast...
while self.read_buffer.len() > 0 {
// Try to find our delimiter, a newline
let i = match self.read_buffer.iter().position(|&b| b == b'\n') {
Some(i) => i,
None => break,
};
// If we found a delimiter, try to convert that slice to a string.
// Regardless remove the bytes we're currently processing.
let contents = str::from_utf8(&self.read_buffer[..i])
.map(|s| s.trim().to_string());
self.read_buffer.drain(..i + 1); // +1 for the newline as well
// If the messages was actually valid utf-8 we broadcast it to
// everyone, otherwise we silently discard it for now.
if let Ok(contents) = contents {
let message = Message { from: self.addr, contents };
let mut clients = self.shared.clients.borrow_mut();
for (&id, client) in clients.iter_mut() {
if id == self.id {
continue
}
client.message_queue.push(message.clone());
if let Some(wake) = client.wake.take() {
wake.notify();
}
}
}
}
// Alright after all that we're good go go! Let's wait for the next
// message.
Ok(())
}
fn write_buffer(&mut self, wake: &WakeHandle) -> io::Result<()> {
while self.write_buffer.len() > 0 {
let n = self.socket.write(&self.write_buffer, wake)?;
self.write_buffer.drain(..n);
}
Ok(())
}
fn read_buffer(&mut self, wake: &WakeHandle) -> io::Result<()> {
while !self.eof {
self.eof = self.read(wake)? == 0;
}
Ok(())
}
fn read(&mut self, wake: &WakeHandle) -> io::Result<usize> {
match self.socket.read(unsafe { slice_to_end(&mut self.read_buffer) }, wake)? {
0 => return Ok(0),
n => {
unsafe {
let len = self.read_buffer.len();
self.read_buffer.set_len(len + n);
}
return Ok(n)
}
}
unsafe fn slice_to_end(v: &mut Vec<u8>) -> &mut [u8] {
use std::slice;
if v.capacity() == 0 {
v.reserve(16);
}
if v.capacity() == v.len() {
v.reserve(1);
}
slice::from_raw_parts_mut(v.as_mut_ptr().offset(v.len() as isize),
v.capacity() - v.len())
}
}
}
impl Drop for Client {
fn drop(&mut self) {
// When our client is done and the future is deallocate be sure to
// unregister ourselves from the broadcast queue as we'll no longer be
// processing any of those messages!
let mut clients = self.shared.clients.borrow_mut();
clients.remove(&self.id);
}
}
fn main() {
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse().unwrap();
let shared = Rc::default();
// Create the TCP listener we'll accept connections on.
let server = Server {
listener: TcpListener::bind(&addr).unwrap(),
shared: shared,
};
println!("Listening on: {}", addr);
CurrentThread::run(|_| {
CurrentThread::execute(server);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment