Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Created May 19, 2024 19:31
Show Gist options
  • Save mooreniemi/8d37e5be1729b308ea99f9b25d114fc7 to your computer and use it in GitHub Desktop.
Save mooreniemi/8d37e5be1729b308ea99f9b25d114fc7 to your computer and use it in GitHub Desktop.
`SharedFile` using `unsafe`
[package]
name = "delta_fake"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.86"
fd-lock = "4.0.2"
nix = "0.28.0"
rand = "0.8.5"
use anyhow::{Context, Result};
use libc::{flock, F_RDLCK, F_SETLK, F_SETLKW, F_UNLCK, F_WRLCK, SEEK_SET};
use nix::libc;
use std::fs::{File, OpenOptions};
use std::os::fd::AsRawFd;
use std::os::unix::io::RawFd;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
use std::{fmt, io};
fn seekless_read(fd: i32, buffer: &mut [u8], offset: i64) -> io::Result<usize> {
let ret = unsafe {
libc::pread(
fd,
buffer.as_mut_ptr() as *mut libc::c_void,
buffer.len(),
offset,
)
};
if ret < 0 {
Err(io::Error::last_os_error())
} else {
Ok(ret as usize)
}
}
fn seekless_write(fd: i32, buffer: &[u8], offset: i64) -> io::Result<usize> {
let ret = unsafe {
libc::pwrite(
fd,
buffer.as_ptr() as *const libc::c_void,
buffer.len(),
offset,
)
};
if ret < 0 {
Err(io::Error::last_os_error())
} else {
Ok(ret as usize)
}
}
/// Attempts to acquire a lock on a file descriptor with a specified timeout.
///
/// # Arguments
/// * `fd` - The file descriptor on which to acquire the lock.
/// * `lock` - The lock structure describing the type and specifics of the lock.
/// * `timeout_override` - Duration to keep trying before giving up.
///
/// # Returns
/// * `Ok(())` if the lock was successfully acquired.
/// * `Err(io::Error)` if the lock could not be acquired within the timeout.
fn acquire_lock_with_timeout(
fd: RawFd,
lock: &mut flock,
timeout_override: Option<Duration>,
) -> io::Result<()> {
let timeout = timeout_override.unwrap_or(Duration::from_millis(1));
let start_time = Instant::now();
let action = if lock.l_type == libc::F_WRLCK {
"write"
} else {
"read"
};
loop {
if unsafe { libc::fcntl(fd, F_SETLKW, &mut *lock) } != -1 {
return Ok(());
} else if let Some(error_code) = io::Error::last_os_error().raw_os_error() {
match error_code {
libc::EACCES | libc::EAGAIN => {
if start_time.elapsed() > timeout {
return Err(io::Error::new(
io::ErrorKind::TimedOut,
format!("{} lock acquisition timed out", action),
));
}
println!("Timeout not reached, will sleep to acquire lock.");
thread::sleep(Duration::from_micros(10));
}
_ => {
eprintln!(
"Failed to acquire {} lock, unexpected error code: {}.",
action, error_code
);
return Err(io::Error::last_os_error());
}
}
}
}
}
/// A `SharedFile` allows for multiple threads to `read` and `write` with byte range locks.
/// For this, a `file` is opened and the raw file descriptor is used to coordinate the lock.
/// Because we're using the `fd` this way, we must avoid using `seek` on the `file` since
/// that is actually also a shared pointer it will race when different threads use it. Thus
/// we use wrapped calls to `pread` and `pwrite` which can change the byte ranges without
/// relying on the `seek`.
#[derive(Clone)]
struct SharedFile {
file: Arc<File>,
path: String,
}
impl fmt::Debug for SharedFile {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("SharedFile")
.field("path", &self.path)
.finish()
}
}
impl SharedFile {
fn new(path: &str) -> Result<Self> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)?;
Ok(Self {
file: Arc::new(file),
path: path.to_string(),
})
}
/// This is approximate because we are not guaranteeing that in-flight writes are commited.
fn approx_len(self) -> Result<u64> {
let file = Arc::clone(&self.file);
// NOTE: we do not want to use any technique with `seek` even if it is faster
Ok(file.metadata().context("reading file metadata")?.len())
}
/// Depends on `approx_len` so is not guaranteed to append exactly at end.
fn append(&self, data: &[u8]) -> Result<()> {
let end = <SharedFile as Clone>::clone(&self).approx_len()?;
// NOTE: we do not want to use any technique with `seek` even if it is faster
self.write(end, data)?;
Ok(())
}
/// Read lock a given range of a file and read from it.
fn read(&self, start: u64, length: usize) -> Result<Vec<u8>> {
let file = Arc::clone(&self.file);
// we avoid seek because this would be shared across threads!
let file = file.try_clone()?;
let mut buffer = vec![0u8; length];
let mut lock = flock {
l_type: F_RDLCK,
l_whence: SEEK_SET as i16,
l_start: start as libc::off_t,
l_len: length as libc::off_t,
l_pid: 0,
};
acquire_lock_with_timeout(file.as_raw_fd(), &mut lock, None)?;
seekless_read(file.as_raw_fd(), &mut buffer, start.try_into().unwrap())?;
lock.l_type = F_UNLCK;
unsafe {
libc::fcntl(file.as_raw_fd(), F_SETLK, &mut lock);
}
Ok(buffer)
}
/// Write lock a given range of a file and write to it.
fn write(&self, start: u64, mut data: &[u8]) -> Result<()> {
let file = Arc::clone(&self.file);
let file = file.try_clone()?;
let mut lock = flock {
l_type: F_WRLCK,
l_whence: SEEK_SET as i16,
l_start: start as libc::off_t,
l_len: data.len() as libc::off_t,
l_pid: 0,
};
acquire_lock_with_timeout(file.as_raw_fd(), &mut lock, None)?;
seekless_write(file.as_raw_fd(), &mut data, start.try_into().unwrap())?;
lock.l_type = F_UNLCK;
unsafe {
libc::fcntl(file.as_raw_fd(), F_SETLK, &mut lock);
}
Ok(())
}
}
fn main() -> Result<()> {
let shared_file = SharedFile::new("/tmp/foo.txt")?;
shared_file.append(b"Initial data, ")?;
// can use this to position randomly inside
let initial_len = shared_file.clone().approx_len()?;
println!("initial length of file: {}", initial_len);
let handle1 = thread::spawn({
let file = shared_file.clone();
move || {
let data = file.read(0, 10).expect("handle1 read data");
println!("Reader 1: {:?}", String::from_utf8_lossy(&data));
}
});
let handle2 = thread::spawn({
let file = shared_file.clone();
move || {
file.write(5, b"Hello").expect("handle2 wrote data");
println!("Writer 1 wrote 'Hello'");
}
});
let handle3 = thread::spawn({
let file = shared_file.clone();
move || {
let data = file.read(0, 7).expect("handle3 read data");
println!("Reader 2: {:?}", String::from_utf8_lossy(&data));
}
});
let handle4 = thread::spawn({
let file = shared_file.clone();
move || {
file.write(5, b"Hallo").expect("handle4 wrote data");
println!("Writer 2 wrote 'Hallo'");
}
});
handle1.join().unwrap();
handle2.join().unwrap();
handle3.join().unwrap();
handle4.join().unwrap();
let mut final_content = String::new();
let mut file = OpenOptions::new().read(true).open("/tmp/foo.txt")?;
io::Read::read_to_string(&mut file, &mut final_content)?;
println!("Final content of the file: {}", final_content);
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment