-
-
Save rajivr/954bf2ab715b3b3bfa7712138dda1948 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//! Provides [`FdbDatabase`] type that implements [`Database`] trait. | |
use crate::error::check; | |
use crate::transaction::{ | |
FdbTransaction, ReadTransaction, ReadTransactionContext, Transaction, TransactionContext, | |
}; | |
use crate::FdbResult; | |
use std::ptr::{self, NonNull}; | |
use std::sync::Arc; | |
pub use crate::option::DatabaseOption; | |
/// A handle to FoundationDB database. All reads and writes to the | |
/// database are transactional. | |
/// | |
/// A [`FdbDatabase`] can be created using [`open_database`] function. | |
/// | |
/// [`open_database`]: crate::open_database | |
// | |
// *NOTE*: If you make changes to this type, make sure you update | |
// tests for `DummyFdbDatabase`, `DropTestDummyFdbDatabase` | |
// accordingly. | |
#[derive(Clone, Debug)] | |
pub struct FdbDatabase { | |
inner: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>, | |
} | |
impl Drop for FdbDatabase { | |
fn drop(&mut self) { | |
self.inner.take().map(|a| { | |
match Arc::try_unwrap(a) { | |
Ok(a) => unsafe { | |
fdb_sys::fdb_database_destroy(a.as_ptr()); | |
}, | |
Err(at) => { | |
drop(at); | |
} | |
}; | |
}); | |
} | |
} | |
// # Safety | |
// | |
// After `FdbDatabase` is created, `NonNull<fdb_sys::FdbDatabase>` is | |
// accessed read-only, till it is finally dropped. The main reason | |
// for adding `Send` and `Sync` traits is so that it can be moved to | |
// other threads. | |
unsafe impl Send for FdbDatabase {} | |
unsafe impl Sync for FdbDatabase {} | |
/// A mutable, lexicographically ordered mapping from binary keys to | |
/// binary values. [`Transactions`]s are used to manipulate data | |
/// within a single [`Database`] - multiple concurrent `Transactions` on | |
/// a [`Database`] enforce **ACID** properties. | |
pub trait Database: TransactionContext { | |
type Transaction: Transaction; | |
/// Set options on a [`Database`]. | |
fn set_option(&self, option: DatabaseOption) -> FdbResult<()>; | |
/// TODO | |
fn create_transaction(&self) -> FdbResult<Self::Transaction>; | |
} | |
impl ReadTransactionContext for FdbDatabase { | |
fn read<T, F>(&self, f: F) -> FdbResult<T> | |
where | |
Self: Sized, | |
F: Fn(&dyn ReadTransaction) -> FdbResult<T>, | |
{ | |
let t = self.create_transaction()?; | |
loop { | |
let ret_val = f(&t); | |
// Closure returned an error. Check if its retryable. | |
if let Err(e) = ret_val { | |
if t.on_error(e).join().is_err() { | |
return ret_val; | |
} else { | |
continue; | |
} | |
} | |
// We don't need to commit read transaction, return `Ok(T)` | |
return ret_val; | |
} | |
} | |
} | |
impl TransactionContext for FdbDatabase { | |
type Database = FdbDatabase; | |
fn run<T, F>(&self, f: F) -> FdbResult<T> | |
where | |
Self: Sized, | |
F: Fn(&dyn Transaction<Database = Self::Database>) -> FdbResult<T>, | |
{ | |
let t = self.create_transaction()?; | |
loop { | |
let ret_val = f(&t); | |
// Closure returned an error. Check if its retryable. | |
if let Err(e) = ret_val { | |
if t.on_error(e).join().is_err() { | |
return ret_val; | |
} else { | |
continue; | |
} | |
} | |
// Commit returned an error. Check if its retryable. | |
if let Err(e) = t.commit().join() { | |
if t.on_error(e).join().is_err() { | |
return ret_val; | |
} else { | |
continue; | |
} | |
} | |
// Commit successful, return `Ok(T)` | |
return ret_val; | |
} | |
} | |
} | |
impl Database for FdbDatabase { | |
type Transaction = FdbTransaction; | |
fn set_option(&self, option: DatabaseOption) -> FdbResult<()> { | |
// Safety: It is safe to unwrap here because if we have given | |
// out an `FdbDatabase` then `inner` *must* be | |
// `Some<Arc<...>>`. | |
unsafe { option.apply((*(self.inner.as_ref().unwrap())).as_ptr()) } | |
} | |
fn create_transaction(&self) -> FdbResult<FdbTransaction> { | |
let mut ptr: *mut fdb_sys::FDB_transaction = ptr::null_mut(); | |
// Safety: It is safe to unwrap here because if we have given | |
// out an `FdbDatabase` then `inner` *must* be | |
// `Some<Arc<...>>`. | |
check(unsafe { | |
fdb_sys::fdb_database_create_transaction( | |
(*(self.inner.as_ref().unwrap())).as_ptr(), | |
&mut ptr, | |
) | |
}) | |
.map(|_| { | |
FdbTransaction::new( | |
NonNull::new(ptr).expect( | |
"fdb_database_create_transaction returned null, but did not return an error", | |
), | |
self.clone(), | |
) | |
}) | |
} | |
} | |
#[doc(hidden)] | |
pub mod open_database { | |
use super::FdbDatabase; | |
use crate::error::check; | |
use crate::FdbError; | |
use crate::FdbResult; | |
use std::ffi::CString; | |
use std::path::Path; | |
use std::ptr::{self, NonNull}; | |
use std::sync::Arc; | |
/// Returns [`Database`] handle to the Foundation DB cluster | |
/// identified by the provided cluster file. | |
/// | |
/// A single client can use this function multiple times to connect to | |
/// different clusters simultaneously, with each invocation requiring | |
/// its own cluster file. | |
/// | |
/// [`Database`]: super::Database | |
pub fn open_database<P>(cluster_file_path: P) -> FdbResult<FdbDatabase> | |
where | |
P: AsRef<Path>, | |
{ | |
let path = CString::new( | |
cluster_file_path | |
.as_ref() | |
.to_str() | |
.ok_or(FdbError::new(100))?, | |
) | |
.map_err(|_| FdbError::new(100))?; | |
// `path_ptr` is valid till we do `drop(path)`. | |
let path_ptr = path.as_ptr(); | |
let mut v: *mut fdb_sys::FDBDatabase = ptr::null_mut(); | |
let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) }; | |
drop(path); | |
// At this stage, we either have an error or a valid v. | |
check(err)?; | |
Ok(FdbDatabase { | |
inner: Some(Arc::new(NonNull::new(v).expect( | |
"fdb_create_database returned null, but did not return an error", | |
))), | |
}) | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::FdbDatabase; | |
use static_assertions::{assert_impl_all, assert_not_impl_any}; | |
use std::ptr::NonNull; | |
use std::sync::atomic::{AtomicBool, Ordering}; | |
use std::sync::Arc; | |
use std::thread; | |
assert_impl_all!(FdbDatabase: Send, Sync, Clone); | |
assert_not_impl_any!(FdbDatabase: Copy); | |
#[derive(Clone, Debug)] | |
struct DummyFdbDatabase { | |
inner: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>, | |
} | |
unsafe impl Send for DummyFdbDatabase {} | |
unsafe impl Sync for DummyFdbDatabase {} | |
#[test] | |
fn trait_bounds() { | |
fn trait_bounds_for_fdb_database<T>(_t: T) | |
where | |
T: Send + Sync + 'static, | |
{ | |
} | |
let d = DummyFdbDatabase { | |
inner: Some(Arc::new(NonNull::dangling())), | |
}; | |
trait_bounds_for_fdb_database(d); | |
} | |
static mut DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED: AtomicBool = AtomicBool::new(false); | |
#[derive(Clone, Debug)] | |
struct DropTestDummyFdbDatabase { | |
inner: Option<Arc<NonNull<fdb_sys::FDBDatabase>>>, | |
} | |
unsafe impl Send for DropTestDummyFdbDatabase {} | |
unsafe impl Sync for DropTestDummyFdbDatabase {} | |
impl Drop for DropTestDummyFdbDatabase { | |
fn drop(&mut self) { | |
self.inner.take().map(|a| { | |
match Arc::try_unwrap(a) { | |
Ok(_) => { | |
unsafe { | |
DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.store(true, Ordering::SeqCst); | |
}; | |
} | |
Err(at) => { | |
drop(at); | |
} | |
}; | |
}); | |
} | |
} | |
#[test] | |
fn multiple_drop() { | |
let d0 = DropTestDummyFdbDatabase { | |
inner: Some(Arc::new(NonNull::dangling())), | |
}; | |
// Initially this is false. | |
assert_eq!( | |
unsafe { DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.load(Ordering::SeqCst) }, | |
false | |
); | |
let d1 = d0.clone(); | |
assert_eq!(Arc::strong_count(d1.inner.as_ref().unwrap()), 2); | |
let t1 = thread::spawn(move || { | |
let _ = d1; | |
}); | |
t1.join().unwrap(); | |
assert_eq!(Arc::strong_count(d0.inner.as_ref().unwrap()), 1); | |
let d2 = d0.clone(); | |
let d3 = d2.clone(); | |
let t2_3 = thread::spawn(move || { | |
let _ = d2; | |
let _ = d3; | |
}); | |
t2_3.join().unwrap(); | |
assert_eq!(Arc::strong_count(d0.inner.as_ref().unwrap()), 1); | |
drop(d0); | |
assert_eq!( | |
unsafe { DROP_TEST_DUMMY_FDB_DATABASE_HAS_DROPPED.load(Ordering::SeqCst) }, | |
true | |
); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment