Skip to content

Instantly share code, notes, and snippets.

@narthollis
Created March 11, 2019 10:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save narthollis/fc0dafd54ba25eaf06d3f9f9ae4349b6 to your computer and use it in GitHub Desktop.
Save narthollis/fc0dafd54ba25eaf06d3f9f9ae4349b6 to your computer and use it in GitHub Desktop.
extern crate exitcode;
extern crate ctrlc;
extern crate chrono;
use std::io;
use std::net::{TcpListener, SocketAddr, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::io::{Read, Write};
use core::borrow::Borrow;
use std::time;
use std::collections::{VecDeque, HashSet};
use std::fs::File;
enum Task {
Read { addr: SocketAddr, sock: TcpStream, prev: Vec<u8> },
Write { value: String, source: SocketAddr },
Wait,
}
fn main() {
let config = match Config::new(std::env::args()) {
Ok(config) => config,
Err(err) => {
eprintln!("{}", err);
std::process::exit(exitcode::USAGE)
}
};
println!("Output will be: {}", config.output);
let running = Arc::new(AtomicBool::new(true));
let r = running.clone();
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
}).expect("Error setting Ctrl-C handler");
let mut tasks: VecDeque<Task> = VecDeque::new();
let mut reader = Reader::new();
let mut writer = TimingWriter::new(config.output);
let listener = match TcpListener::bind("127.0.0.1:24123") {
Ok(l) => l,
Err(e) => {
eprintln!("Error binding to port: {}", e);
std::process::exit(exitcode::UNAVAILABLE);
}
};
listener.set_nonblocking(true).expect("Unable to set non-blocking");
let mut clients: HashSet<SocketAddr> = HashSet::new();
while running.load(Ordering::SeqCst) {
tasks.append(&mut VecDeque::from(accept(&listener, &mut clients)));
let task: Task = match tasks.pop_front() {
Some(t) => t,
None => Task::Wait,
};
let new_tasks = match task {
Task::Read { addr, sock, prev } => reader.read(addr, sock, prev, &mut clients),
Task::Write { value, source } => writer.write(value, source),
Task::Wait => wait(),
};
tasks.append(&mut VecDeque::from(new_tasks));
writer.log(tasks.len(), clients.len())
}
}
const SEPARATOR: u8 = 0x1e;
fn wait() -> Vec<Task> {
//println!("Wait");
std::thread::sleep(time::Duration::from_millis(1));
return vec![];
}
struct TimingWriter {
timestamp: time::Instant,
count: u64,
duration: time::Duration,
file: File,
}
impl TimingWriter {
fn new(output: String) -> TimingWriter {
return TimingWriter {
count: 0,
timestamp: time::Instant::now(),
duration: time::Duration::new(5, 0),
file: File::create(output).expect("Unable to open file"),
};
}
fn write(&mut self, value: String, source: SocketAddr) -> Vec<Task> {
// println!("Received from: '{}', Value: '{}'", source, value);
self.file.write_all(format!("Received from: '{}', Value: '{}'\n", source, value).as_bytes()).expect("Unable to write line");
self.count = self.count + 1;
return vec![];
}
fn log(&mut self, tasks: usize, clients: usize) {
let elapsed = self.timestamp.elapsed();
if elapsed > self.duration {
let date = chrono::Local::now();
println!("[{}] Queue: {:6} - Clients {:4} - Processed {:6} / {} sec - {:5} /sec", date, tasks, clients, self.count, elapsed.as_secs(), self.count / elapsed.as_secs());
self.count = 0;
self.timestamp = time::Instant::now();
}
}
}
fn accept(listener: &TcpListener, clients: &mut HashSet<SocketAddr>) -> Vec<Task> {
match listener.accept() {
Ok((s, a)) => {
s.set_nonblocking(true).expect("Could not set socket non-blocking");
clients.insert(a);
return vec![
Task::Read { addr: a, sock: s, prev: vec![] },
];
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return vec![];
}
Err(e) => panic!("encountered IO error: {}", e),
}
}
struct Reader {
buffer: [u8; 64],
}
impl Reader {
fn new() -> Reader {
return Reader {
buffer: [0; 64],
};
}
fn read(&mut self, addr: SocketAddr, sock: TcpStream, prev: Vec<u8>, clients: &mut HashSet<SocketAddr>) -> Vec<Task> {
let count: usize = match sock.borrow().read(&mut self.buffer) {
Ok(c) => c,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
return vec![Task::Read { addr, sock, prev }];
}
Err(e) => {
eprintln!("Error reading socket '{}': '{}'", addr, e);
return vec![];
}
};
if count == 0 {
clients.remove(&addr);
return vec![];
}
let mut tasks: Vec<Task> = Vec::new();
let full = [prev, self.buffer[..count].to_vec()].concat();
let mut bits = full.rsplit(|b| *b == SEPARATOR);
match bits.next() {
Some(x) => tasks.insert(0, Task::Read { addr, sock, prev: x.to_vec() }),
None => tasks.insert(0, Task::Read { addr, sock, prev: vec![] }),
}
for line in bits {
tasks.insert(0, Task::Write { value: String::from(String::from_utf8_lossy(line)), source: addr });
}
tasks.reverse();
return tasks;
}
}
struct Config {
output: String,
}
impl Config {
pub fn new(mut args: std::env::Args) -> Result<Config, &'static str> {
args.next();
let output = match args.next() {
Some(arg) => arg,
None => return Err("No output path provided."),
};
return Ok(Config { output });
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment