Skip to content

Instantly share code, notes, and snippets.

@bovee
Created August 3, 2018 21:39
Show Gist options
  • Save bovee/e9c979799ebc6d0c29156c995518eec7 to your computer and use it in GitHub Desktop.
Save bovee/e9c979799ebc6d0c29156c995518eec7 to your computer and use it in GitHub Desktop.
use std::cell::UnsafeCell;
use std::io::Write;
// `parking_lot` evidently has better implementations of some of these sync primatives?
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use errors::Result;
const BUFFER_SIZE: usize = 1000;
pub struct OrderedWriter {
start_line: AtomicUsize,
end_line: AtomicUsize,
n_lines: AtomicUsize,
flush_lock: Arc<(Mutex<bool>, Condvar)>,
buffer: UnsafeCell<Vec<String>>,
}
unsafe impl Sync for OrderedWriter {}
impl OrderedWriter {
pub fn new() -> Self {
OrderedWriter {
start_line: AtomicUsize::new(0),
end_line: AtomicUsize::new(BUFFER_SIZE),
n_lines: AtomicUsize::new(0),
flush_lock: Arc::new((Mutex::new(false), Condvar::new())),
buffer: UnsafeCell::new(vec![String::new(); BUFFER_SIZE]),
}
}
pub fn write(&self, line_number: usize, line: String) {
let buffer = unsafe { &mut *self.buffer.get() };
buffer[line_number - self.start_line.load(Ordering::Relaxed)] = line;
// we add one in to the atomic and add 1 to the value returned to be consistant
let n_lines = self.n_lines.fetch_add(1, Ordering::Relaxed) + 1;
self.release_lock(n_lines);
}
/// use this after processing all the lines to make sure the last `synced_flush`
/// gets called
pub fn last_line(&self, line_number: usize) {
self.end_line.store(line_number, Ordering::Relaxed);
let n_lines = self.n_lines.load(Ordering::Relaxed);
self.release_lock(n_lines);
}
#[inline]
fn release_lock(&self, n_lines: usize) {
if n_lines == self.end_line.load(Ordering::Relaxed) {
// release the lock for `check_lock` to take
let &(ref lock, ref cvar) = &*self.flush_lock;
let mut flush = lock.lock().unwrap();
*flush = true;
// this could be `notify_all` too, but only the IO thread should be
// calling `synced_flush` anyhow
cvar.notify_one();
}
}
pub fn synced_flush(&self, writer: &mut Write, line_number: Option<usize>) -> Result<()> {
// if `line_number` is specified, make sure it's at the end of the buffer
// otherwise return until we've reached the end
if let Some(ln) = line_number {
if ln < self.end_line.load(Ordering::Relaxed) {
return Ok(());
}
}
// halt until all the `write`s are done
{
let &(ref lock, ref cvar) = &*self.flush_lock;
let mut flush = lock.lock().unwrap();
while !*flush {
flush = cvar.wait(flush).unwrap();
}
}
// self.n_lines is the total number of lines "written" so far, but n_lines here is the
// number of lines "to write"
let n_lines =
self.n_lines.load(Ordering::Relaxed) - self.start_line.load(Ordering::Relaxed);
// now we write out everything
let buffer = unsafe { &mut *self.buffer.get() };
for line in buffer.iter().take(n_lines) {
writer.write_all(line.as_bytes())?;
}
// and bump the start and end
let start_line = self.start_line.fetch_add(n_lines, Ordering::Relaxed) + n_lines;
self.end_line
.store(start_line + BUFFER_SIZE, Ordering::Relaxed);
// relock the flush_lock
let &(ref lock, ref _cvar) = &*self.flush_lock;
let mut flush = lock.lock().unwrap();
*flush = false;
Ok(())
}
}
impl Default for OrderedWriter {
fn default() -> Self {
OrderedWriter {
start_line: AtomicUsize::new(0),
end_line: AtomicUsize::new(BUFFER_SIZE),
n_lines: AtomicUsize::new(0),
flush_lock: Arc::new((Mutex::new(false), Condvar::new())),
buffer: UnsafeCell::new(vec![String::new(); BUFFER_SIZE]),
}
}
}
#[test]
fn test_ordered_writer() {
extern crate threadpool;
let pool = threadpool::ThreadPool::default();
let master_writer = Arc::new(OrderedWriter::new());
let mut line_no = 0;
let mut output: Vec<u8> = Vec::new();
for i in 0..10 {
let writer = Arc::clone(&master_writer);
pool.execute(move || {
writer.write(line_no, format!("{},", i));
});
line_no += 1;
let writer = Arc::clone(&master_writer);
writer.synced_flush(&mut output, Some(line_no)).unwrap();
}
let writer = Arc::clone(&master_writer);
writer.last_line(line_no);
writer.synced_flush(&mut output, None).unwrap();
assert_eq!(output, b"0,1,2,3,4,5,6,7,8,9,");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment