Created
March 11, 2019 10:16
-
-
Save narthollis/fc0dafd54ba25eaf06d3f9f9ae4349b6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate exitcode; | |
extern crate ctrlc; | |
extern crate chrono; | |
use std::io; | |
use std::net::{TcpListener, SocketAddr, TcpStream}; | |
use std::sync::atomic::{AtomicBool, Ordering}; | |
use std::sync::Arc; | |
use std::io::{Read, Write}; | |
use core::borrow::Borrow; | |
use std::time; | |
use std::collections::{VecDeque, HashSet}; | |
use std::fs::File; | |
enum Task { | |
Read { addr: SocketAddr, sock: TcpStream, prev: Vec<u8> }, | |
Write { value: String, source: SocketAddr }, | |
Wait, | |
} | |
fn main() { | |
let config = match Config::new(std::env::args()) { | |
Ok(config) => config, | |
Err(err) => { | |
eprintln!("{}", err); | |
std::process::exit(exitcode::USAGE) | |
} | |
}; | |
println!("Output will be: {}", config.output); | |
let running = Arc::new(AtomicBool::new(true)); | |
let r = running.clone(); | |
ctrlc::set_handler(move || { | |
r.store(false, Ordering::SeqCst); | |
}).expect("Error setting Ctrl-C handler"); | |
let mut tasks: VecDeque<Task> = VecDeque::new(); | |
let mut reader = Reader::new(); | |
let mut writer = TimingWriter::new(config.output); | |
let listener = match TcpListener::bind("127.0.0.1:24123") { | |
Ok(l) => l, | |
Err(e) => { | |
eprintln!("Error binding to port: {}", e); | |
std::process::exit(exitcode::UNAVAILABLE); | |
} | |
}; | |
listener.set_nonblocking(true).expect("Unable to set non-blocking"); | |
let mut clients: HashSet<SocketAddr> = HashSet::new(); | |
while running.load(Ordering::SeqCst) { | |
tasks.append(&mut VecDeque::from(accept(&listener, &mut clients))); | |
let task: Task = match tasks.pop_front() { | |
Some(t) => t, | |
None => Task::Wait, | |
}; | |
let new_tasks = match task { | |
Task::Read { addr, sock, prev } => reader.read(addr, sock, prev, &mut clients), | |
Task::Write { value, source } => writer.write(value, source), | |
Task::Wait => wait(), | |
}; | |
tasks.append(&mut VecDeque::from(new_tasks)); | |
writer.log(tasks.len(), clients.len()) | |
} | |
} | |
const SEPARATOR: u8 = 0x1e; | |
fn wait() -> Vec<Task> { | |
//println!("Wait"); | |
std::thread::sleep(time::Duration::from_millis(1)); | |
return vec![]; | |
} | |
struct TimingWriter { | |
timestamp: time::Instant, | |
count: u64, | |
duration: time::Duration, | |
file: File, | |
} | |
impl TimingWriter { | |
fn new(output: String) -> TimingWriter { | |
return TimingWriter { | |
count: 0, | |
timestamp: time::Instant::now(), | |
duration: time::Duration::new(5, 0), | |
file: File::create(output).expect("Unable to open file"), | |
}; | |
} | |
fn write(&mut self, value: String, source: SocketAddr) -> Vec<Task> { | |
// println!("Received from: '{}', Value: '{}'", source, value); | |
self.file.write_all(format!("Received from: '{}', Value: '{}'\n", source, value).as_bytes()).expect("Unable to write line"); | |
self.count = self.count + 1; | |
return vec![]; | |
} | |
fn log(&mut self, tasks: usize, clients: usize) { | |
let elapsed = self.timestamp.elapsed(); | |
if elapsed > self.duration { | |
let date = chrono::Local::now(); | |
println!("[{}] Queue: {:6} - Clients {:4} - Processed {:6} / {} sec - {:5} /sec", date, tasks, clients, self.count, elapsed.as_secs(), self.count / elapsed.as_secs()); | |
self.count = 0; | |
self.timestamp = time::Instant::now(); | |
} | |
} | |
} | |
fn accept(listener: &TcpListener, clients: &mut HashSet<SocketAddr>) -> Vec<Task> { | |
match listener.accept() { | |
Ok((s, a)) => { | |
s.set_nonblocking(true).expect("Could not set socket non-blocking"); | |
clients.insert(a); | |
return vec![ | |
Task::Read { addr: a, sock: s, prev: vec![] }, | |
]; | |
} | |
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { | |
return vec![]; | |
} | |
Err(e) => panic!("encountered IO error: {}", e), | |
} | |
} | |
struct Reader { | |
buffer: [u8; 64], | |
} | |
impl Reader { | |
fn new() -> Reader { | |
return Reader { | |
buffer: [0; 64], | |
}; | |
} | |
fn read(&mut self, addr: SocketAddr, sock: TcpStream, prev: Vec<u8>, clients: &mut HashSet<SocketAddr>) -> Vec<Task> { | |
let count: usize = match sock.borrow().read(&mut self.buffer) { | |
Ok(c) => c, | |
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { | |
return vec![Task::Read { addr, sock, prev }]; | |
} | |
Err(e) => { | |
eprintln!("Error reading socket '{}': '{}'", addr, e); | |
return vec![]; | |
} | |
}; | |
if count == 0 { | |
clients.remove(&addr); | |
return vec![]; | |
} | |
let mut tasks: Vec<Task> = Vec::new(); | |
let full = [prev, self.buffer[..count].to_vec()].concat(); | |
let mut bits = full.rsplit(|b| *b == SEPARATOR); | |
match bits.next() { | |
Some(x) => tasks.insert(0, Task::Read { addr, sock, prev: x.to_vec() }), | |
None => tasks.insert(0, Task::Read { addr, sock, prev: vec![] }), | |
} | |
for line in bits { | |
tasks.insert(0, Task::Write { value: String::from(String::from_utf8_lossy(line)), source: addr }); | |
} | |
tasks.reverse(); | |
return tasks; | |
} | |
} | |
struct Config { | |
output: String, | |
} | |
impl Config { | |
pub fn new(mut args: std::env::Args) -> Result<Config, &'static str> { | |
args.next(); | |
let output = match args.next() { | |
Some(arg) => arg, | |
None => return Err("No output path provided."), | |
}; | |
return Ok(Config { output }); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment