Skip to content

Instantly share code, notes, and snippets.

@lukebitts
Last active May 28, 2016 19:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lukebitts/7e030862f0cd91d50923d61032c23838 to your computer and use it in GitHub Desktop.
Save lukebitts/7e030862f0cd91d50923d61032c23838 to your computer and use it in GitHub Desktop.
extern crate mio;
extern crate byteorder;
use mio::*;
use mio::tcp::*;
use mio::util::Slab;
use std::net::SocketAddr;
use std::io::{Error, ErrorKind};
#[derive(Debug, PartialEq, Eq)]
pub enum ClientStatus {
Idle,
WaitingHeader,
ReceivingData(u16, Vec<u8>),
}
impl ClientStatus {
pub fn next(self, buffer: &mut Vec<u8>) -> Result<Self, std::io::Error> {
use ClientStatus::*;
match self {
Idle => Ok(WaitingHeader),
WaitingHeader => {
if buffer.len() > 1 {
let data_len = convert_header(&buffer[0..2]);
buffer.drain(0..2);
Ok(ReceivingData(data_len, Vec::with_capacity(data_len as usize)))
} else {
Ok(self)
}
}
ReceivingData(data_len, mut data) => {
let remaining = data_len as usize - data.len();
if buffer.len() >= remaining {
data.extend(buffer.drain(0..remaining));
// We have everything, parse it!
try!(parse_data(std::mem::replace(&mut data, vec![])));
Ok(Idle)
} else {
data.extend(buffer.drain(..));
Ok(ReceivingData(data_len, data))
}
}
}
}
}
#[derive(Debug)]
pub struct Client {
socket: TcpStream,
status: ClientStatus,
buffer: Vec<u8>,
}
impl Client {
pub fn new(socket: TcpStream) -> Client {
Client {
socket: socket,
status: ClientStatus::Idle,
buffer: Vec::new(),
}
}
fn buffer_data(&mut self) -> Result<(), std::io::Error> {
if let Some(len) = try!(self.socket.try_read_buf(&mut self.buffer)) {
let end = self.buffer.len() - len;
println!("Received raw: {:?}", &self.buffer[end..]);
}
Ok(())
}
pub fn read(&mut self) -> Result<(), std::io::Error> {
try!(self.buffer_data());
while (self.status != ClientStatus::WaitingHeader && self.buffer.len() > 0) ||
(self.status == ClientStatus::WaitingHeader && self.buffer.len() > 1) {
let old_status = std::mem::replace(&mut self.status, ClientStatus::Idle);
self.status = try!(old_status.next(&mut self.buffer));
}
Ok(())
}
}
#[derive(Debug)]
struct Server {
socket: TcpListener,
clients: Slab<Client>,
}
const SERVER_TOKEN: Token = Token(0);
fn convert_header(mut header: &[u8]) -> u16 {
use byteorder::{LittleEndian, ReadBytesExt};
header.read_u16::<LittleEndian>().expect("Not enough data to parse header")
}
fn parse_data(data: Vec<u8>) -> Result<(), std::io::Error> {
let data = try!(std::str::from_utf8(&data[..])
.map_err(|_| Error::new(ErrorKind::InvalidData, "Could not convert bytes.")));
println!("Received some data! Size: {}\n\tValue: {}", data.len(), data);
Ok(())
}
fn token_ready(tk: &mut Token,
socket: &mut TcpListener,
clients: &mut Slab<Client>,
event_loop: &mut EventLoop<Server>,
token: Token,
events: EventSet)
-> Result<(), std::io::Error> {
match token {
SERVER_TOKEN => {
let new_token;
if let Some((client_socket, _)) = try!(socket.accept()) {
new_token = try!(clients.insert(Client::new(client_socket))
.ok()
.ok_or(Error::new(ErrorKind::ConnectionRefused,
"Max number of connections reached.")));
*tk = new_token;
} else {
return Err(Error::new(ErrorKind::Other, format!("{}: Socket was not ready yet.", token.0)));
}
println!("{}: New connection! Total connections: {}", new_token.0, clients.count());
try!(event_loop.register(&clients[new_token].socket,
new_token,
EventSet::readable(),
PollOpt::edge() | PollOpt::oneshot()));
}
_ => {
if !events.is_hup() {
*tk = token;
let client: &mut Client = try!(clients.get_mut(token)
.ok_or(Error::new(ErrorKind::InvalidInput,
"Token was not registered. Ignoring.")));
try!(client.read());
try!(event_loop.reregister(&client.socket,
token,
EventSet::readable() | EventSet::hup(),
PollOpt::edge() | PollOpt::oneshot()));
} else {
println!("{}: Removing connection", token.0);
clients.remove(token);
};
}
}
Ok(())
}
impl Handler for Server {
type Timeout = usize;
type Message = ();
fn ready(&mut self, event_loop: &mut EventLoop<Server>, token: Token, events: EventSet) {
let mut tk = SERVER_TOKEN;
if let Err(e) = token_ready(&mut tk, &mut self.socket, &mut self.clients, event_loop, token, events) {
println!("{}: Something went wrong: {}", tk.0, e);
if tk != SERVER_TOKEN {
self.clients.remove(tk);
}
}
}
}
fn main() {
let address = "127.0.0.1:10000".parse::<SocketAddr>().unwrap();
let server_socket = TcpListener::bind(&address).unwrap();
let mut event_loop = EventLoop::new().unwrap();
let mut handler = Server {
clients: Slab::new_starting_at(Token(1), 10),
socket: server_socket,
};
event_loop.register(&handler.socket, SERVER_TOKEN, EventSet::readable(), PollOpt::edge())
.unwrap();
event_loop.run(&mut handler).unwrap();
}
extern crate byteorder;
use std::io::prelude::*;
use std::net::TcpStream;
fn convert_header(header: u16) -> [u8; 2] {
use byteorder::{LittleEndian, WriteBytesExt};
let mut wtr = vec![];
wtr.write_u16::<LittleEndian>(header).unwrap();
[wtr[0], wtr[1]]
}
fn send_text(text: &str, stream: &mut TcpStream) {
if text.len() > (std::u16::MAX as usize) {
panic!();
}
let len = convert_header(text.len() as u16);
let pct = String::from(text).into_bytes();
let _ = stream.write(&len);
let _ = stream.write(&pct);
}
fn unify_texts(texts: Vec<&str>) -> Vec<u8> {
let mut data = vec![];
for text in texts {
if text.len() > (std::u16::MAX as usize) {
panic!();
}
let len = convert_header(text.len() as u16);
let pct = String::from(text).into_bytes();
data.push(len[0]);
data.push(len[1]);
for p in pct {
data.push(p);
}
}
data
}
fn send_texts(texts: Vec<&str>, stream: &mut TcpStream) {
let data = unify_texts(texts);
let _ = stream.write(&data[..]);
}
fn send_texts_broken(texts: Vec<&str>, sleep_secs: u64, stream: &mut TcpStream) {
let data = unify_texts(texts);
let half = data.len() / 2;
let _ = stream.write(&data[0..half]);
std::thread::sleep(std::time::Duration::new(sleep_secs, 0));
let _ = stream.write(&data[half..data.len()]);
}
fn main() {
println!("{}", std::u16::MAX);
let max = 1;
let mut v = vec![];
for _ in 0..max {
v.push(std::thread::spawn(|| {
let mut stream = TcpStream::connect("127.0.0.1:10000").unwrap();
send_texts_broken(vec!["a"], 1, &mut stream);
send_text("Test test test tests lots of tests", &mut stream);
send_texts_broken(vec!["123"], 1, &mut stream);
std::thread::sleep(std::time::Duration::new(1, 0));
send_texts(vec!["e", "é", "c", "ç"], &mut stream);
}));
}
for t in v {
let _ = t.join();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment