Skip to content

Instantly share code, notes, and snippets.

@averagesecurityguy
Created October 21, 2020 22:48
Show Gist options
  • Save averagesecurityguy/b978da793bc373086c9f08903a5f4ac3 to your computer and use it in GitHub Desktop.
Save averagesecurityguy/b978da793bc373086c9f08903a5f4ac3 to your computer and use it in GitHub Desktop.
Multi-threaded File Reader
use std::fs::File;
use std::io::{self, BufRead};
use std::sync::mpsc;
use std::sync;
pub struct MultiReader {
fname: String,
count: sync::Arc<sync::Mutex<usize>>,
chans: sync::Arc<sync::Mutex<Vec::<mpsc::Sender::<Option<String>>>>>,
}
impl MultiReader {
pub fn read(&mut self) -> io::Result<()> {
let file = File::open(&self.fname)?;
let reader = io::BufReader::new(file);
for (_, line) in reader.lines().enumerate() {
self.send(line?);
}
self.close();
Ok(())
}
fn send(&mut self, item: String) {
let mut count = self.count.lock().unwrap();
let chans = self.chans.lock().unwrap();
chans[*count % chans.len()].send(Some(item)).unwrap();
*count += 1;
}
pub fn add_receiver(&mut self) -> mpsc::Receiver<Option<String>> {
let mut chans = self.chans.lock().unwrap();
let (tx, rx) = mpsc::channel();
chans.push(tx);
rx
}
fn close(&mut self) {
let chans = self.chans.lock().unwrap();
for i in 0..chans.len() {
chans[i].send(None).unwrap();
}
}
}
pub fn multi_reader(fname: String) -> MultiReader {
let count = sync::Arc::new(sync::Mutex::new(0));
let senders: Vec<mpsc::Sender<Option<String>>> = Vec::new();
let chans = sync::Arc::new(sync::Mutex::new(senders));
MultiReader { fname: fname, count: count, chans: chans }
}
/*
Proof of concept for a mult-threaded file reader. Allows for a single-producer
multi-consumer pattern on a file.
*/
use std::thread;
use multiread;
const N: usize = 10;
fn process(s: String) {
println!("{}", s);
}
fn main() {
let mut t = multiread::multi_reader(String::from("src/main.rs"));
let mut threads = vec![];
for _ in 0..N {
let rx = t.add_receiver();
threads.push(thread::spawn(move || {
for val in rx {
match val {
Some(val) => process(val),
None => return
}
}
}));
}
t.read().unwrap();
for i in threads {
let _ = i.join();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment