Skip to content

Instantly share code, notes, and snippets.

@rajivr
Created May 5, 2022 01:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rajivr/ad899849821589abba299c7320eb2fc3 to your computer and use it in GitHub Desktop.
Save rajivr/ad899849821589abba299c7320eb2fc3 to your computer and use it in GitHub Desktop.
use bytes::Bytes;
use fdb::database::FdbDatabase;
use fdb::error::FdbResult;
use fdb::range::{Range, RangeOptions};
use fdb::transaction::{FdbTransaction, Transaction};
use fdb::tuple::Tuple;
use fdb::{Key, Mapper, Value};
use tokio::runtime::Runtime;
use tokio_stream::StreamExt;
use std::env;
use std::error::Error;
// In Record Layer, there is a class `FDBRecordStoreKeyspace` that
// describes various spaces such as `STORE_INFO`, `RECORD`, `INDEX`,
// `INDEX_SECONDARY_SPACE`. etc.,
//
// "prefix" can be the subspace of the record store.
//
// ```
// ("prefix", "INDEX", "index-key-of-record-00012345", "primary-key-of-record-00012345") = ()
// ```
//
// Stores index
//
// ```
// ("prefix", "RECORD", "primary-key-of-record-00012345", 0) = ("data-of-record-00012345", 0)
// ("prefix", "RECORD", "primary-key-of-record-00012345", 1) = ("data-of-record-00012345", 1)
// ("prefix", "RECORD", "primary-key-of-record-00012345", 2) = ("data-of-record-00012345", 2)
// ```
//
// Stores record. Record key prefix excludes the "split" part, while
// record key includes the split part.
const PREFIX: &str = "prefix";
const RECORD: &str = "RECORD";
const INDEX: &str = "INDEX";
fn empty() -> Bytes {
Tuple::new().pack()
}
fn primary_key(i: u32) -> String {
format!("primary-key-of-record-{:08}", i)
}
fn index_key(i: u32) -> String {
format!("index-key-of-record-{:08}", i)
}
fn data_of_record(i: u32) -> String {
format!("data-of-record-{:08}", i)
}
fn mapper() -> Mapper {
let mapper_tup: (&'static str, &'static str, &'static str, &'static str) =
(PREFIX, RECORD, "{K[3]}", "{...}");
let mapper = {
let mut tup = Tuple::new();
// PREFIX
tup.add_string((mapper_tup.0).to_string());
// RECORD
tup.add_string((mapper_tup.1).to_string());
// "{K[3]}"
tup.add_string((mapper_tup.2).to_string());
// "{...}"
tup.add_string((mapper_tup.3).to_string());
tup
};
mapper.into()
}
const SPLIT_SIZE: u32 = 3;
// Example
//
// ```
// ("prefix", "INDEX", "index-key-of-record-00012345", "primary-key-of-record-00012345")
// ```
fn index_entry_key(i: u32) -> Key {
let index_tup: (&'static str, &'static str, String, String) =
(PREFIX, INDEX, index_key(i), primary_key(i));
let index_key = {
let mut tup = Tuple::new();
// PREFIX
tup.add_string((index_tup.0).to_string());
// INDEX
tup.add_string((index_tup.1).to_string());
tup.add_string(index_tup.2);
tup.add_string(index_tup.3);
tup
};
index_key.pack().into()
}
// Example
//
// ```
// ("prefix", "RECORD", "primary-key-of-record-00012345")
// ```
//
// Is an example of record key prefix, without split. We don't use it
// in our example, but it is used in Java integration test.
#[allow(dead_code)]
fn record_key_prefix(i: u32) -> Tuple {
let rec_key_prefix_tup: (&'static str, &'static str, String) = (PREFIX, RECORD, primary_key(i));
{
let mut tup = Tuple::new();
// PREFIX
tup.add_string((rec_key_prefix_tup.0).to_string());
// RECORD
tup.add_string((rec_key_prefix_tup.1).to_string());
tup.add_string(rec_key_prefix_tup.2);
tup
}
}
// Example (contains split)
//
// ```
// ("prefix", "RECORD", "primary-key-of-record-00012345", 2)
// ```
fn record_key(i: u32, split: u32) -> Key {
let rec_key_tup: (&'static str, &'static str, String, u32) =
(PREFIX, RECORD, primary_key(i), split);
let rec_key = {
let mut tup = Tuple::new();
// PREFIX
tup.add_string((rec_key_tup.0).to_string());
// RECORD
tup.add_string((rec_key_tup.1).to_string());
tup.add_string(rec_key_tup.2);
tup.add_i64(rec_key_tup.3.into());
tup
};
rec_key.pack().into()
}
// Example (contains split)
//
// ```
// ("data-of-record-00012345", 2)
// ```
fn record_value(i: u32, split: u32) -> Value {
let rec_value_tup: (String, u32) = (data_of_record(i), split);
let rec_value = {
let mut tup = Tuple::new();
tup.add_string(rec_value_tup.0);
tup.add_i64(rec_value_tup.1.into());
tup
};
rec_value.pack().into()
}
fn insert_record_with_index(tr: &FdbTransaction, i: u32) {
tr.set(index_entry_key(i), empty());
(0..SPLIT_SIZE).for_each(|j| {
tr.set(record_key(i, j), record_value(i, j));
});
}
async fn insert_record_with_indexes(n: u32, db: &FdbDatabase) -> FdbResult<()> {
db.run(|tr| async move {
(0..n).for_each(|i| insert_record_with_index(&tr, i));
Ok(())
})
.await
}
fn main() -> Result<(), Box<dyn Error>> {
let fdb_cluster_file = env::var("FDB_CLUSTER_FILE").expect("FDB_CLUSTER_FILE not defined!");
unsafe {
// Assume this to be atleast `710`
fdb::select_api_version(fdb::FDB_API_VERSION as i32);
fdb::start_network();
}
let fdb_database = fdb::open_database(fdb_cluster_file)?;
let rt = Runtime::new()?;
let cloned_fdb_database = fdb_database.clone();
rt.block_on(async {
let fdb_database = cloned_fdb_database;
// Clear the database.
fdb_database
.run(|tr| async move {
tr.clear_range(Range::new(Bytes::new(), Bytes::from_static(b"\xFF")));
Ok(())
})
.await?;
// Records with primary key of 0..=4 will be inserted.
insert_record_with_indexes(5, &fdb_database).await?;
let _ = fdb_database
.run(|tr| async move {
// Get mapped key values from 1..=3.
let mut mapped_range_stream = Range::new(index_entry_key(1), index_entry_key(4))
.into_mapped_stream(&tr, mapper(), RangeOptions::default());
while let Some(x) = mapped_range_stream.next().await {
let (kv, mapped_range, mapped_kvs) = x?.into_parts();
println!();
println!("-----");
let (kv_key, kv_value) = kv.into_parts();
println!("kv_key: {:?}", Tuple::from_bytes(kv_key)?);
println!("kv_value: {:?}", Tuple::from_bytes(kv_value)?);
println!();
let (mapped_range_begin_key, mapped_range_end_key) = mapped_range.into_parts();
println!(
"mapped_range_begin_key: {:?}",
Tuple::from_bytes(mapped_range_begin_key)?
);
// Not a tuple
println!("mapped_range_end_key: {:?}", mapped_range_end_key);
println!();
for mapped_kv in mapped_kvs {
let (mapped_kv_key, mapped_kv_value) = mapped_kv.into_parts();
println!("mapped_kv_key: {:?}", Tuple::from_bytes(mapped_kv_key)?);
println!("mapped_kv_value: {:?}", Tuple::from_bytes(mapped_kv_value)?);
}
println!("-----");
}
Ok(())
})
.await?;
Result::<(), Box<dyn Error>>::Ok(())
})?;
drop(fdb_database);
unsafe {
fdb::stop_network();
}
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment