Created
March 16, 2023 05:29
-
-
Save jdidion/3e47a83a0047f4291c1beb2f3a179a3f to your computer and use it in GitHub Desktop.
Write to a single file in parallel from multiple threads in Rust
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// requires dependency on rustix | |
// rustix = { version = "0.37.1", features = ["fs"] } | |
#[cfg(test)] | |
mod tests { | |
use rustix::{fd::AsFd, fs::FallocateFlags}; | |
use std::{ | |
env, | |
fs::{File, OpenOptions}, | |
io::{Read, Seek, SeekFrom, Write}, | |
thread, | |
}; | |
// the number of concurrent threads from which to write | |
const THREADS: u64 = 2; | |
// the number of writes each thread will perform | |
const WRITES_PER_THREAD: u64 = 5; | |
// the size of each write in bytes | |
const WRITE_LEN: u64 = 1024; | |
fn preallocate_file<P: AsRef<std::path::Path>>(path: P, size: u64) -> std::io::Result<()> { | |
let file = File::create(path.as_ref())?; | |
// For filesystems that support the `fallocate(2)` system call, preallocation is done | |
// quickly by allocating blocks and marking them as uninitialized, requiring no IO to the | |
// data blocks. Otherwise, preallocation is done by filling the space with zeroes | |
// (the same as if we used `file.set_len(size)`). | |
rustix::fs::fallocate(file.as_fd(), FallocateFlags::empty(), 0, size)?; | |
Ok(()) | |
} | |
#[test] | |
fn test_par_write() -> std::io::Result<()> { | |
let path = env::temp_dir().join("__test__"); | |
let total_writes = THREADS * WRITES_PER_THREAD; | |
preallocate_file(&path, total_writes * WRITE_LEN)?; | |
thread::scope(|scope| -> thread::Result<std::io::Result<Vec<()>>> { | |
// Each thread `t` in `0..THREADS` opens the same file in write mode and then | |
// performs `WRITES_PER_THREAD` writes to the file. Each write `i` occurs at | |
// offset `((i * THREADS) + t) * WRITE_LEN` and consists of `WRITE_LEN` | |
// bytes all with value `t`. | |
let handles: Vec<thread::ScopedJoinHandle<std::io::Result<()>>> = (0..THREADS) | |
.into_iter() | |
.map(|t| { | |
let path = &path; | |
scope.spawn(move || -> std::io::Result<()> { | |
let mut file = OpenOptions::new().write(true).open(path)?; | |
let buf = vec![t as u8; WRITE_LEN as usize]; | |
for i in 0..WRITES_PER_THREAD { | |
file.seek(SeekFrom::Start(((i * THREADS) + t) * WRITE_LEN))?; | |
file.write(&buf)?; | |
} | |
Ok(()) | |
}) | |
}) | |
.collect(); | |
handles.into_iter().map(|h| h.join()).collect() | |
}) | |
.unwrap()?; | |
// Check that the file has the expected contents | |
let mut file = File::open(&path)?; | |
for i in 0..total_writes { | |
let mut buf = [u8::MAX; WRITE_LEN]; | |
assert_eq!(file.read(&mut buf)?, WRITE_LEN); | |
let unique = std::collections::BTreeSet::from(buf); | |
assert_eq!(unique.len(), 1); | |
assert_eq!(unique.into_iter().next().unwrap(), i % THREADS); | |
} | |
Ok(()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment