Skip to content

Instantly share code, notes, and snippets.

@mooreniemi
Created May 20, 2024 03:17
Show Gist options
  • Save mooreniemi/bb23d386ce6eb84d764cbb834e4197fe to your computer and use it in GitHub Desktop.
Save mooreniemi/bb23d386ce6eb84d764cbb834e4197fe to your computer and use it in GitHub Desktop.
[package]
name = "delta_fake"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.86"
dashmap = "5.5.3"
fd-lock = "4.0.2"
left-right = "0.11.5"
memmap2 = "0.9.4"
nix = "0.28.0"
rand = "0.8.5"
hyper = "1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tiny_http = "*"
tokio = { version = "1", features = ["full"] }
reqwest = { version = "0.12", features = ["json"] }
use rand::{distributions::Alphanumeric, seq::IteratorRandom, Rng};
use reqwest::{Client, StatusCode};
use serde_json::json;
use std::collections::HashSet;
use tokio;
use tokio::time::{Duration, Instant};
async fn send_key_value(
client: &Client,
submit_url: &str,
lookup_url: &str,
key: &str,
value: &str,
times: &mut Vec<Duration>,
) -> Result<(), reqwest::Error> {
let body = json!({ "key": key, "value": value });
// Measure time for submission
let start = Instant::now();
client
.post(submit_url)
.json(&body)
.send()
.await?
.error_for_status()?; // Ensure the response status is checked
let elapsed = start.elapsed();
times.push(elapsed);
//println!("Time to submit '{}': {:?}", key, elapsed);
// Measure time for lookup
let start = Instant::now();
let response = client
.get(format!("{}?key={}", lookup_url, key))
.send()
.await?
.error_for_status()?;
let elapsed = start.elapsed();
times.push(elapsed);
//println!("Time to lookup '{}': {:?}", key, elapsed);
// Print the value if the lookup was successful
if response.status() == StatusCode::OK {
let r_value: String = response
.json::<serde_json::Value>()
.await?
.get("value")
.unwrap()
.as_str() // This returns an Option<&str>
.unwrap() // Unwrap the Option<&str> to &str
.to_string(); // Convert &str to String
assert_eq!(value, r_value.as_str(), "we got back what we sent in");
//println!("Lookup value for '{}': {}", key, value);
}
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::new();
let submit_url = "http://localhost:6000/submit";
let lookup_url = "http://localhost:6000/lookup";
let mut rng = rand::thread_rng();
let mut keys: HashSet<String> = HashSet::new();
let mut times: Vec<Duration> = Vec::new();
for _ in 0..100 {
// sometimes edit, sometimes create
let key = if rng.gen_bool(0.05) && !keys.is_empty() {
keys.iter().choose(&mut rng).unwrap().to_string()
} else {
let new_key: String = std::iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.map(char::from)
.take(10)
.collect();
keys.insert(new_key.clone());
new_key
};
let value: String = std::iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.map(char::from)
.take(20)
.collect();
if let Err(e) =
send_key_value(&client, submit_url, lookup_url, &key, &value, &mut times).await
{
eprintln!("Error sending or looking up key-value: {}", e);
}
}
// Calculate and print the average times
let average_time = times.iter().sum::<Duration>() / times.len() as u32;
println!("Average time for operations: {:?}", average_time);
Ok(())
}
use dashmap::DashMap;
use memmap2::{Mmap, MmapMut, MmapOptions};
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::fs::{File, OpenOptions};
use std::io;
use std::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering};
use std::sync::mpsc::Sender;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use tiny_http::{Header, Method, Request, Response, Server, StatusCode};
#[derive(Deserialize, Serialize, Debug)]
struct KeyValue {
key: String,
value: String,
}
fn handle_request(
mut request: Request,
sender: &Sender<KeyValue>,
kv_store: &Arc<AtomicPtr<ManagedMemory>>,
) {
match (
request.method(),
request.url().split('?').nth(0).unwrap_or("/uknown"),
) {
// Handle POST requests to submit key-value pairs
(&Method::Post, "/submit") => {
let content_length = request
.headers()
.iter()
.find(|h| h.field.as_str().to_ascii_lowercase() == "content-length")
.and_then(|h| h.value.as_str().parse::<usize>().ok())
.expect("need content length");
let mut content = vec![0; content_length];
request.as_reader().read_exact(&mut content).unwrap();
println!("Received content: {}", String::from_utf8_lossy(&content)); // Print raw JSON content
if let Ok(key_value) = serde_json::from_slice::<KeyValue>(&content) {
sender
.send(key_value)
.expect("Failed to send key-value to the writer");
let json = json!({"status": "submitted"});
let response = Response::from_string(json.to_string())
.with_header(
Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(),
)
.with_status_code(StatusCode(200));
request.respond(response).unwrap();
} else {
let json = json!({"status": "error: invalid data"});
let response = Response::from_string(json.to_string())
.with_header(
Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(),
)
.with_status_code(StatusCode(400));
request.respond(response).unwrap();
}
}
// Handle GET requests to lookup key-value pairs
(&Method::Get, "/lookup") => {
let key = request
.url()
.split('?')
.nth(1)
.and_then(|q| q.split('=').nth(1))
.unwrap_or("");
let mm_ptr = kv_store.load(Ordering::SeqCst);
let mm = unsafe { &*mm_ptr };
if let Some(entry) = mm.locations.get(key) {
let (pos, len) = *entry;
let value = String::from_utf8_lossy(&mm.mmap_immutable[pos..pos + len]);
println!("Return data for key: {}={:?}", key, value);
let json = json!({"key": key, "value": value});
let response = Response::from_string(json.to_string())
.with_header(
Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(),
)
.with_status_code(StatusCode(200));
request.respond(response).unwrap();
} else {
eprintln!("expected to read {}", key);
let json = json!({"status": format!("error: {} key not found", key)});
let response = Response::from_string(json.to_string())
.with_header(
Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(),
)
.with_status_code(StatusCode(404));
request.respond(response).unwrap();
}
}
// Handle not found
(_method, other) => {
let json = json!({"status": format!("error: {} route not found", other)});
let response = Response::from_string(json.to_string())
.with_header(
Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap(),
)
.with_status_code(StatusCode(404));
request.respond(response).unwrap();
}
}
}
struct ManagedMemory {
mmap: MmapMut,
mmap_immutable: Mmap,
// FIXME: just simple to use DashMap to start but could switch to LeftRight or whatever
locations: DashMap<String, (usize, usize)>,
highest_byte_position: AtomicUsize,
unused_bytes: AtomicUsize,
}
impl ManagedMemory {
fn from_compacted(
file: File,
new_locations: DashMap<String, (usize, usize)>,
highest_byte_position: AtomicUsize,
) -> io::Result<Self> {
let mmap = unsafe { MmapOptions::new().map_mut(&file).expect("open compacted") };
let mmap_immutable = unsafe { MmapOptions::new().map(&file).expect("open compacted") };
Ok(Self {
mmap,
mmap_immutable,
locations: new_locations,
highest_byte_position,
unused_bytes: AtomicUsize::new(0),
})
}
fn new(file_path: &str, size: u64) -> io::Result<Self> {
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(file_path)
.expect("creating new dat file");
file.set_len(size)?;
let mmap = unsafe { MmapOptions::new().map_mut(&file)? };
let mmap_immutable = unsafe { MmapOptions::new().map(&file)? };
Ok(Self {
mmap,
mmap_immutable,
locations: DashMap::new(),
highest_byte_position: AtomicUsize::new(0),
unused_bytes: AtomicUsize::new(0),
})
}
fn room_to_write(&self, data: &Vec<u8>) -> bool {
let data_len = data.len();
let new_pos = self.highest_byte_position.load(Ordering::SeqCst) + data_len;
new_pos + data_len < self.mmap.len()
}
fn write_data(&mut self, key: String, data: Vec<u8>) -> io::Result<()> {
let data_len = data.len();
let new_pos = self
.highest_byte_position
.fetch_add(data_len, Ordering::SeqCst);
for (i, &byte) in data.iter().enumerate() {
self.mmap[new_pos + i] = byte;
}
if let Some((_old_pos, old_len)) = self.locations.insert(key.clone(), (new_pos, data_len)) {
self.unused_bytes.fetch_add(old_len, Ordering::SeqCst);
}
Ok(())
}
}
fn setup_shared_memory(file_path: &str, size: u64) -> io::Result<Arc<AtomicPtr<ManagedMemory>>> {
println!("will setup shared_memory at {}", file_path);
let managed_memory = ManagedMemory::new(file_path, size)?;
let managed_memory_box = Box::new(managed_memory);
let managed_memory_ptr = Box::into_raw(managed_memory_box);
Ok(Arc::new(AtomicPtr::new(managed_memory_ptr)))
}
fn swap_memory(atomic_managed_memory: &Arc<AtomicPtr<ManagedMemory>>, new_memory: ManagedMemory) {
println!("swapping managed memory pointer");
let new_memory_box = Box::new(new_memory);
let new_memory_ptr = Box::into_raw(new_memory_box);
let old_memory_ptr = atomic_managed_memory.swap(new_memory_ptr, Ordering::SeqCst);
println!("swapped managed memory pointer");
unsafe {
// drop old memory
let _ = Box::from_raw(old_memory_ptr);
}
}
fn compact_file(
original_file: &str,
new_file: &str,
locations: &DashMap<String, (usize, usize)>,
file_size: u64,
) -> io::Result<(File, DashMap<String, (usize, usize)>, usize)> {
let mut new_file = OpenOptions::new()
.write(true)
.read(true)
.create(true)
.open(new_file)?;
let mut current_pos = 0;
let new_locations = DashMap::new();
for entry in locations.iter() {
let (key, (pos, len)) = entry.pair();
let mut buffer = vec![0u8; *len];
let old_file = File::open(original_file)?;
match std::os::unix::fs::FileExt::read_exact_at(&old_file, &mut buffer, *pos as u64) {
Ok(_) => {
io::Write::write_all(&mut new_file, &buffer)?;
new_locations.insert(key.clone(), (current_pos, *len));
current_pos += len;
}
Err(e) => {
eprintln!("Failed to read data for key {}: {}", key, e);
continue;
}
}
}
println!("compacted to {}", current_pos as f32 / file_size as f32);
// otherwise the file is only as big as you compacted to...
new_file.set_len(file_size)?;
Ok((new_file, new_locations, current_pos))
}
static COMPACTING: AtomicBool = AtomicBool::new(false);
static VERSION: AtomicUsize = AtomicUsize::new(0);
// NOTE: smaller values while testing is helpful
static MAX_FILE_SIZE: u64 = 1024 * 1024;
static COMPACTION_PERCENT_TRIGGER: f32 = 0.3;
static COMPACTION_SLEEP_SECS: u64 = 2;
static READ_SAMPLE_KEYS_INTERVAL_SECS: u64 = 10;
fn main() -> io::Result<()> {
// TODO: just always starting from afresh rather than loading
let shared_memory = setup_shared_memory(
&format!("./data/example.{}.dat", VERSION.load(Ordering::Relaxed)),
MAX_FILE_SIZE,
)?;
let (tx, rx) = std::sync::mpsc::channel::<KeyValue>();
let sm_clone = shared_memory.clone();
// NOTE: this thread must not panic, or you have stranded the server from writes
let writer_thread = thread::spawn(move || {
while let Ok(KeyValue { key, value }) = rx.recv() {
println!("will write key={}, value={}", key, value);
// FIXME: poor man's write buffer is unbounded...
while COMPACTING.load(Ordering::SeqCst).eq(&true) {
println!("buffering a write due to compaction...");
thread::sleep(Duration::from_secs(1));
}
let mm_ptr = sm_clone.load(Ordering::SeqCst);
let mm = unsafe { &mut *mm_ptr };
let data = value.into_bytes();
// FIXME: probably not what we want here
let mut attempts = 0;
while !mm.room_to_write(&data) && attempts < 5 {
println!("delaying a write because file was full...");
attempts += 1;
thread::sleep(Duration::from_secs(2_i32.pow(attempts).try_into().unwrap()));
}
match mm.write_data(key.clone(), data) {
Ok(_) => (),
Err(e) => {
// NOTE: could flush the write data to a dead letter queue type log...
eprintln!("unrecoverably failed a write to {} due to {}", key, e)
}
}
}
});
let sm_clone_read = shared_memory.clone();
let compaction_thread = thread::spawn(move || loop {
thread::sleep(Duration::from_secs(COMPACTION_SLEEP_SECS));
COMPACTING.swap(true, Ordering::SeqCst);
let mm_ptr = sm_clone_read.load(Ordering::SeqCst);
let mm = unsafe { &*mm_ptr };
let total = mm.unused_bytes.load(Ordering::Relaxed) as f32 / mm.mmap_immutable.len() as f32;
if total > COMPACTION_PERCENT_TRIGGER {
let current_version = VERSION.load(Ordering::SeqCst);
let new_version = VERSION.fetch_add(1, Ordering::SeqCst) + 1;
println!(
"will increment dat version from {} to {}",
current_version, new_version
);
let (new_file, new_locations, max_pos) = compact_file(
&format!("./data/example.{}.dat", current_version),
&format!("./data/example.{}.dat", new_version),
&mm.locations,
MAX_FILE_SIZE,
)
.expect("compacted");
let new_memory = ManagedMemory::from_compacted(new_file, new_locations, max_pos.into())
.expect("new mm");
swap_memory(&sm_clone_read, new_memory);
} else {
println!("compacting found {} dead, sleeping", total);
}
COMPACTING.swap(false, Ordering::SeqCst);
});
// this is just a thread that's continously logging a sample of keys
let sm_clone_read = shared_memory.clone();
let reader_thread = thread::spawn(move || {
loop {
thread::sleep(Duration::from_secs(READ_SAMPLE_KEYS_INTERVAL_SECS));
let mm_ptr = sm_clone_read.load(Ordering::SeqCst);
let mm = unsafe { &*mm_ptr };
let keys: Vec<_> = mm
.locations
.iter()
.take(3)
.map(|entry| entry.key().clone())
.collect();
//let keys = vec!["key1", "key2", "key3"];
for key in keys {
if let Some(entry) = mm.locations.get(&key) {
let (pos, len) = *entry;
println!(
"Read data for key: {}={:?}",
key,
String::from_utf8_lossy(&mm.mmap_immutable[pos..pos + len])
);
} else {
eprintln!("expected to read {}", key)
}
}
}
});
let sm_clone_read = shared_memory.clone();
let server_addr = "0.0.0.0:6000";
let server = Server::http(server_addr).unwrap();
thread::spawn(move || {
println!("starting tiny-http server on: {}", server_addr);
for request in server.incoming_requests() {
handle_request(request, &tx, &sm_clone_read);
}
});
reader_thread.join().unwrap();
compaction_thread.join().unwrap();
writer_thread.join().unwrap();
Ok(())
}
@mooreniemi
Copy link
Author

mooreniemi commented May 20, 2024

This is just a toy to play with to get intuition about how to handle writes, compaction, and swapping. It is very helpful to run it in ways that show you how these things break.

Eg. you can play with these settings:

static MAX_FILE_SIZE: u64 = 1024 * 1024;
static COMPACTION_PERCENT_TRIGGER: f32 = 0.3;

static COMPACTION_SLEEP_SECS: u64 = 2;

Server:

# set up the server as main.rs
cargo run
will setup shared_memory at ./data/example.0.dat
starting tiny-http server on: 0.0.0.0:6000
compacting found 0 dead, sleeping
will write key=KJABwwsRod, value=bZJWcnO6niV37RoMukHq
Return data for key: KJABwwsRod="bZJWcnO6niV37RoMukHq"
Received content: {"key":"m0upCWxGsI","value":"rlRXPMvNW2sel7bo6QAp"}
will write key=m0upCWxGsI, value=rlRXPMvNW2sel7bo6QAp
Return data for key: m0upCWxGsI="rlRXPMvNW2sel7bo6QAp"

Client:

# set up the client as an example in examples
cargo run --package delta_fake --example client
# for 5% edits running 100 requests takes a quarter of a millisecond
Average time for operations: 236.422µs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment