Skip to content

Instantly share code, notes, and snippets.

@rajivr
Created August 10, 2021 07:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rajivr/954bf2ab715b3b3bfa7712138dda1948 to your computer and use it in GitHub Desktop.
Save rajivr/954bf2ab715b3b3bfa7712138dda1948 to your computer and use it in GitHub Desktop.
//! Provides [`FdbDatabase`] type that implements [`Database`] trait.
use crate::error::check;
use crate::transaction::{
FdbTransaction, ReadTransaction, ReadTransactionContext, Transaction, TransactionContext,
};
use crate::FdbResult;
use std::ptr::{self, NonNull};
use std::sync::Arc;
pub use crate::option::DatabaseOption;
/// A handle to FoundationDB database. All reads and writes to the
/// database are transactional.
///
/// A [`FdbDatabase`] can be created using [`open_database`] function.
///
/// [`open_database`]: crate::open_database
//
// *NOTE*: If you make changes to this type, make sure you update
// tests for `DummyFdbDatabase`, `DropTestDummyFdbDatabase`
// accordingly.
#[derive(Clone, Debug)]
pub struct FdbDatabase {
inner: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>,
}
impl Drop for FdbDatabase {
fn drop(&mut self) {
self.inner.take().map(|a| {
match Arc::try_unwrap(a) {
Ok(a) => unsafe {
fdb_sys::fdb_database_destroy(a.as_ptr());
},
Err(at) => {
drop(at);
}
};
});
}
}
// # Safety
//
// After `FdbDatabase` is created, `NonNull<fdb_sys::FdbDatabase>` is
// accessed read-only, till it is finally dropped. The main reason
// for adding `Send` and `Sync` traits is so that it can be moved to
// other threads.
unsafe impl Send for FdbDatabase {}
unsafe impl Sync for FdbDatabase {}
/// A mutable, lexicographically ordered mapping from binary keys to
/// binary values. [`Transactions`]s are used to manipulate data
/// within a single [`Database`] - multiple concurrent `Transactions` on
/// a [`Database`] enforce **ACID** properties.
pub trait Database: TransactionContext {
type Transaction: Transaction;
/// Set options on a [`Database`].
fn set_option(&self, option: DatabaseOption) -> FdbResult<()>;
/// TODO
fn create_transaction(&self) -> FdbResult<Self::Transaction>;
}
impl ReadTransactionContext for FdbDatabase {
fn read<T, F>(&self, f: F) -> FdbResult<T>
where
Self: Sized,
F: Fn(&dyn ReadTransaction) -> FdbResult<T>,
{
let t = self.create_transaction()?;
loop {
let ret_val = f(&t);
// Closure returned an error. Check if its retryable.
if let Err(e) = ret_val {
if t.on_error(e).join().is_err() {
return ret_val;
} else {
continue;
}
}
// We don't need to commit read transaction, return `Ok(T)`
return ret_val;
}
}
}
impl TransactionContext for FdbDatabase {
type Database = FdbDatabase;
fn run<T, F>(&self, f: F) -> FdbResult<T>
where
Self: Sized,
F: Fn(&dyn Transaction<Database = Self::Database>) -> FdbResult<T>,
{
let t = self.create_transaction()?;
loop {
let ret_val = f(&t);
// Closure returned an error. Check if its retryable.
if let Err(e) = ret_val {
if t.on_error(e).join().is_err() {
return ret_val;
} else {
continue;
}
}
// Commit returned an error. Check if its retryable.
if let Err(e) = t.commit().join() {
if t.on_error(e).join().is_err() {
return ret_val;
} else {
continue;
}
}
// Commit successful, return `Ok(T)`
return ret_val;
}
}
}
impl Database for FdbDatabase {
type Transaction = FdbTransaction;
fn set_option(&self, option: DatabaseOption) -> FdbResult<()> {
// Safety: It is safe to unwrap here because if we have given
// out an `FdbDatabase` then `inner` *must* be
// `Some<Arc<...>>`.
unsafe { option.apply((*(self.inner.as_ref().unwrap())).as_ptr()) }
}
fn create_transaction(&self) -> FdbResult<FdbTransaction> {
let mut ptr: *mut fdb_sys::FDB_transaction = ptr::null_mut();
// Safety: It is safe to unwrap here because if we have given
// out an `FdbDatabase` then `inner` *must* be
// `Some<Arc<...>>`.
check(unsafe {
fdb_sys::fdb_database_create_transaction(
(*(self.inner.as_ref().unwrap())).as_ptr(),
&mut ptr,
)
})
.map(|_| {
FdbTransaction::new(
NonNull::new(ptr).expect(
"fdb_database_create_transaction returned null, but did not return an error",
),
self.clone(),
)
})
}
}
#[doc(hidden)]
pub mod open_database {
use super::FdbDatabase;
use crate::error::check;
use crate::FdbError;
use crate::FdbResult;
use std::ffi::CString;
use std::path::Path;
use std::ptr::{self, NonNull};
use std::sync::Arc;
/// Returns [`Database`] handle to the Foundation DB cluster
/// identified by the provided cluster file.
///
/// A single client can use this function multiple times to connect to
/// different clusters simultaneously, with each invocation requiring
/// its own cluster file.
///
/// [`Database`]: super::Database
pub fn open_database<P>(cluster_file_path: P) -> FdbResult<FdbDatabase>
where
P: AsRef<Path>,
{
let path = CString::new(
cluster_file_path
.as_ref()
.to_str()
.ok_or(FdbError::new(100))?,
)
.map_err(|_| FdbError::new(100))?;
// `path_ptr` is valid till we do `drop(path)`.
let path_ptr = path.as_ptr();
let mut v: *mut fdb_sys::FDBDatabase = ptr::null_mut();
let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
drop(path);
// At this stage, we either have an error or a valid v.
check(err)?;
Ok(FdbDatabase {
inner: Some(Arc::new(NonNull::new(v).expect(
"fdb_create_database returned null, but did not return an error",
))),
})
}
}
#[cfg(test)]
mod tests {
use super::FdbDatabase;
use static_assertions::{assert_impl_all, assert_not_impl_any};
use std::ptr::NonNull;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
assert_impl_all!(FdbDatabase: Send, Sync, Clone);
assert_not_impl_any!(FdbDatabase: Copy);
#[derive(Clone, Debug)]
struct DummyFdbDatabase {
inner: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>,
}
unsafe impl Send for DummyFdbDatabase {}
unsafe impl Sync for DummyFdbDatabase {}
#[test]
fn trait_bounds() {
fn trait_bounds_for_fdb_database<T>(_t: T)
where
T: Send + Sync + 'static,
{
}
let d = DummyFdbDatabase {
inner: Some(Arc::new(NonNull::dangling())),
};
trait_bounds_for_fdb_database(d);
}
static mut DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED: AtomicBool = AtomicBool::new(false);
#[derive(Clone, Debug)]
struct DropTestDummyFdbDatabase {
inner: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>,
}
unsafe impl Send for DropTestDummyFdbDatabase {}
unsafe impl Sync for DropTestDummyFdbDatabase {}
impl Drop for DropTestDummyFdbDatabase {
fn drop(&mut self) {
self.inner.take().map(|a| {
match Arc::try_unwrap(a) {
Ok(_) => {
unsafe {
DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.store(true, Ordering::SeqCst);
};
}
Err(at) => {
drop(at);
}
};
});
}
}
#[test]
fn multiple_drop() {
let d0 = DropTestDummyFdbDatabase {
inner: Some(Arc::new(NonNull::dangling())),
};
// Initially this is false.
assert_eq!(
unsafe { DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.load(Ordering::SeqCst) },
false
);
let d1 = d0.clone();
assert_eq!(Arc::strong_count(d1.inner.as_ref().unwrap()), 2);
let t1 = thread::spawn(move || {
let _ = d1;
});
t1.join().unwrap();
assert_eq!(Arc::strong_count(d0.inner.as_ref().unwrap()), 1);
let d2 = d0.clone();
let d3 = d2.clone();
let t2_3 = thread::spawn(move || {
let _ = d2;
let _ = d3;
});
t2_3.join().unwrap();
assert_eq!(Arc::strong_count(d0.inner.as_ref().unwrap()), 1);
drop(d0);
assert_eq!(
unsafe { DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.load(Ordering::SeqCst) },
true
);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment