Skip to content

Instantly share code, notes, and snippets.

@jdidion
Created March 16, 2023 05:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jdidion/3e47a83a0047f4291c1beb2f3a179a3f to your computer and use it in GitHub Desktop.
Save jdidion/3e47a83a0047f4291c1beb2f3a179a3f to your computer and use it in GitHub Desktop.
Write to a single file in parallel from multiple threads in Rust
// requires dependency on rustix
// rustix = { version = "0.37.1", features = ["fs"] }
#[cfg(test)]
mod tests {
use rustix::{fd::AsFd, fs::FallocateFlags};
use std::{
env,
fs::{File, OpenOptions},
io::{Read, Seek, SeekFrom, Write},
thread,
};
// the number of concurrent threads from which to write
const THREADS: u64 = 2;
// the number of writes each thread will perform
const WRITES_PER_THREAD: u64 = 5;
// the size of each write in bytes
const WRITE_LEN: u64 = 1024;
fn preallocate_file<P: AsRef<std::path::Path>>(path: P, size: u64) -> std::io::Result<()> {
let file = File::create(path.as_ref())?;
// For filesystems that support the `fallocate(2)` system call, preallocation is done
// quickly by allocating blocks and marking them as uninitialized, requiring no IO to the
// data blocks. Otherwise, preallocation is done by filling the space with zeroes
// (the same as if we used `file.set_len(size)`).
rustix::fs::fallocate(file.as_fd(), FallocateFlags::empty(), 0, size)?;
Ok(())
}
#[test]
fn test_par_write() -> std::io::Result<()> {
let path = env::temp_dir().join("__test__");
let total_writes = THREADS * WRITES_PER_THREAD;
preallocate_file(&path, total_writes * WRITE_LEN)?;
thread::scope(|scope| -> thread::Result<std::io::Result<Vec<()>>> {
// Each thread `t` in `0..THREADS` opens the same file in write mode and then
// performs `WRITES_PER_THREAD` writes to the file. Each write `i` occurs at
// offset `((i * THREADS) + t) * WRITE_LEN` and consists of `WRITE_LEN`
// bytes all with value `t`.
let handles: Vec<thread::ScopedJoinHandle<std::io::Result<()>>> = (0..THREADS)
.into_iter()
.map(|t| {
let path = &path;
scope.spawn(move || -> std::io::Result<()> {
let mut file = OpenOptions::new().write(true).open(path)?;
let buf = vec![t as u8; WRITE_LEN as usize];
for i in 0..WRITES_PER_THREAD {
file.seek(SeekFrom::Start(((i * THREADS) + t) * WRITE_LEN))?;
file.write(&buf)?;
}
Ok(())
})
})
.collect();
handles.into_iter().map(|h| h.join()).collect()
})
.unwrap()?;
// Check that the file has the expected contents
let mut file = File::open(&path)?;
for i in 0..total_writes {
let mut buf = [u8::MAX; WRITE_LEN];
assert_eq!(file.read(&mut buf)?, WRITE_LEN);
let unique = std::collections::BTreeSet::from(buf);
assert_eq!(unique.len(), 1);
assert_eq!(unique.into_iter().next().unwrap(), i % THREADS);
}
Ok(())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment