-
-
Save lukebitts/7e030862f0cd91d50923d61032c23838 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate mio; | |
extern crate byteorder; | |
use mio::*; | |
use mio::tcp::*; | |
use mio::util::Slab; | |
use std::net::SocketAddr; | |
use std::io::{Error, ErrorKind}; | |
#[derive(Debug, PartialEq, Eq)] | |
pub enum ClientStatus { | |
Idle, | |
WaitingHeader, | |
ReceivingData(u16, Vec<u8>), | |
} | |
impl ClientStatus { | |
pub fn next(self, buffer: &mut Vec<u8>) -> Result<Self, std::io::Error> { | |
use ClientStatus::*; | |
match self { | |
Idle => Ok(WaitingHeader), | |
WaitingHeader => { | |
if buffer.len() > 1 { | |
let data_len = convert_header(&buffer[0..2]); | |
buffer.drain(0..2); | |
Ok(ReceivingData(data_len, Vec::with_capacity(data_len as usize))) | |
} else { | |
Ok(self) | |
} | |
} | |
ReceivingData(data_len, mut data) => { | |
let remaining = data_len as usize - data.len(); | |
if buffer.len() >= remaining { | |
data.extend(buffer.drain(0..remaining)); | |
// We have everything, parse it! | |
try!(parse_data(std::mem::replace(&mut data, vec![]))); | |
Ok(Idle) | |
} else { | |
data.extend(buffer.drain(..)); | |
Ok(ReceivingData(data_len, data)) | |
} | |
} | |
} | |
} | |
} | |
#[derive(Debug)] | |
pub struct Client { | |
socket: TcpStream, | |
status: ClientStatus, | |
buffer: Vec<u8>, | |
} | |
impl Client { | |
pub fn new(socket: TcpStream) -> Client { | |
Client { | |
socket: socket, | |
status: ClientStatus::Idle, | |
buffer: Vec::new(), | |
} | |
} | |
fn buffer_data(&mut self) -> Result<(), std::io::Error> { | |
if let Some(len) = try!(self.socket.try_read_buf(&mut self.buffer)) { | |
let end = self.buffer.len() - len; | |
println!("Received raw: {:?}", &self.buffer[end..]); | |
} | |
Ok(()) | |
} | |
pub fn read(&mut self) -> Result<(), std::io::Error> { | |
try!(self.buffer_data()); | |
while (self.status != ClientStatus::WaitingHeader && self.buffer.len() > 0) || | |
(self.status == ClientStatus::WaitingHeader && self.buffer.len() > 1) { | |
let old_status = std::mem::replace(&mut self.status, ClientStatus::Idle); | |
self.status = try!(old_status.next(&mut self.buffer)); | |
} | |
Ok(()) | |
} | |
} | |
#[derive(Debug)] | |
struct Server { | |
socket: TcpListener, | |
clients: Slab<Client>, | |
} | |
const SERVER_TOKEN: Token = Token(0); | |
fn convert_header(mut header: &[u8]) -> u16 { | |
use byteorder::{LittleEndian, ReadBytesExt}; | |
header.read_u16::<LittleEndian>().expect("Not enough data to parse header") | |
} | |
fn parse_data(data: Vec<u8>) -> Result<(), std::io::Error> { | |
let data = try!(std::str::from_utf8(&data[..]) | |
.map_err(|_| Error::new(ErrorKind::InvalidData, "Could not convert bytes."))); | |
println!("Received some data! Size: {}\n\tValue: {}", data.len(), data); | |
Ok(()) | |
} | |
fn token_ready(tk: &mut Token, | |
socket: &mut TcpListener, | |
clients: &mut Slab<Client>, | |
event_loop: &mut EventLoop<Server>, | |
token: Token, | |
events: EventSet) | |
-> Result<(), std::io::Error> { | |
match token { | |
SERVER_TOKEN => { | |
let new_token; | |
if let Some((client_socket, _)) = try!(socket.accept()) { | |
new_token = try!(clients.insert(Client::new(client_socket)) | |
.ok() | |
.ok_or(Error::new(ErrorKind::ConnectionRefused, | |
"Max number of connections reached."))); | |
*tk = new_token; | |
} else { | |
return Err(Error::new(ErrorKind::Other, format!("{}: Socket was not ready yet.", token.0))); | |
} | |
println!("{}: New connection! Total connections: {}", new_token.0, clients.count()); | |
try!(event_loop.register(&clients[new_token].socket, | |
new_token, | |
EventSet::readable(), | |
PollOpt::edge() | PollOpt::oneshot())); | |
} | |
_ => { | |
if !events.is_hup() { | |
*tk = token; | |
let client: &mut Client = try!(clients.get_mut(token) | |
.ok_or(Error::new(ErrorKind::InvalidInput, | |
"Token was not registered. Ignoring."))); | |
try!(client.read()); | |
try!(event_loop.reregister(&client.socket, | |
token, | |
EventSet::readable() | EventSet::hup(), | |
PollOpt::edge() | PollOpt::oneshot())); | |
} else { | |
println!("{}: Removing connection", token.0); | |
clients.remove(token); | |
}; | |
} | |
} | |
Ok(()) | |
} | |
impl Handler for Server { | |
type Timeout = usize; | |
type Message = (); | |
fn ready(&mut self, event_loop: &mut EventLoop<Server>, token: Token, events: EventSet) { | |
let mut tk = SERVER_TOKEN; | |
if let Err(e) = token_ready(&mut tk, &mut self.socket, &mut self.clients, event_loop, token, events) { | |
println!("{}: Something went wrong: {}", tk.0, e); | |
if tk != SERVER_TOKEN { | |
self.clients.remove(tk); | |
} | |
} | |
} | |
} | |
fn main() { | |
let address = "127.0.0.1:10000".parse::<SocketAddr>().unwrap(); | |
let server_socket = TcpListener::bind(&address).unwrap(); | |
let mut event_loop = EventLoop::new().unwrap(); | |
let mut handler = Server { | |
clients: Slab::new_starting_at(Token(1), 10), | |
socket: server_socket, | |
}; | |
event_loop.register(&handler.socket, SERVER_TOKEN, EventSet::readable(), PollOpt::edge()) | |
.unwrap(); | |
event_loop.run(&mut handler).unwrap(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate byteorder; | |
use std::io::prelude::*; | |
use std::net::TcpStream; | |
fn convert_header(header: u16) -> [u8; 2] { | |
use byteorder::{LittleEndian, WriteBytesExt}; | |
let mut wtr = vec![]; | |
wtr.write_u16::<LittleEndian>(header).unwrap(); | |
[wtr[0], wtr[1]] | |
} | |
fn send_text(text: &str, stream: &mut TcpStream) { | |
if text.len() > (std::u16::MAX as usize) { | |
panic!(); | |
} | |
let len = convert_header(text.len() as u16); | |
let pct = String::from(text).into_bytes(); | |
let _ = stream.write(&len); | |
let _ = stream.write(&pct); | |
} | |
fn unify_texts(texts: Vec<&str>) -> Vec<u8> { | |
let mut data = vec![]; | |
for text in texts { | |
if text.len() > (std::u16::MAX as usize) { | |
panic!(); | |
} | |
let len = convert_header(text.len() as u16); | |
let pct = String::from(text).into_bytes(); | |
data.push(len[0]); | |
data.push(len[1]); | |
for p in pct { | |
data.push(p); | |
} | |
} | |
data | |
} | |
fn send_texts(texts: Vec<&str>, stream: &mut TcpStream) { | |
let data = unify_texts(texts); | |
let _ = stream.write(&data[..]); | |
} | |
fn send_texts_broken(texts: Vec<&str>, sleep_secs: u64, stream: &mut TcpStream) { | |
let data = unify_texts(texts); | |
let half = data.len() / 2; | |
let _ = stream.write(&data[0..half]); | |
std::thread::sleep(std::time::Duration::new(sleep_secs, 0)); | |
let _ = stream.write(&data[half..data.len()]); | |
} | |
fn main() { | |
println!("{}", std::u16::MAX); | |
let max = 1; | |
let mut v = vec![]; | |
for _ in 0..max { | |
v.push(std::thread::spawn(|| { | |
let mut stream = TcpStream::connect("127.0.0.1:10000").unwrap(); | |
send_texts_broken(vec!["a"], 1, &mut stream); | |
send_text("Test test test tests lots of tests", &mut stream); | |
send_texts_broken(vec!["123"], 1, &mut stream); | |
std::thread::sleep(std::time::Duration::new(1, 0)); | |
send_texts(vec!["e", "é", "c", "ç"], &mut stream); | |
})); | |
} | |
for t in v { | |
let _ = t.join(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment