Created
August 11, 2023 10:52
-
-
Save ChillFish8/607194aa55cab3dc214a97a12d610e0d to your computer and use it in GitHub Desktop.
Fun
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::io; | |
use std::path::{Path, PathBuf}; | |
use std::sync::Arc; | |
use std::time::{Duration, Instant}; | |
use ahash::{HashMap, HashMapExt, HashSet}; | |
use parking_lot::Mutex; | |
use tokio::task::JoinHandle; | |
use crate::backends::ReadBuffer; | |
use crate::{BlobId, BlobIndex, BlobInfo, FileKey, get_data_file, StorageBackend}; | |
#[derive(Debug, Copy, Clone)] | |
/// The configuration of the compaction system. | |
pub struct CompactionConfig { | |
/// The duration between scans being scheduled. | |
/// | |
/// You may want to increase/decrease the duration depending on | |
/// your ingestion rate, as compactions are incremental | |
/// and can require multiple passes. | |
/// | |
/// Defaults to `30s`. | |
pub scan_interval: Duration, | |
/// Returns if the compaction policy needs to read | |
/// the blob data in order to consider a blob for deletion. | |
/// | |
/// Setting this to `false` can save IO bandwidth as it avoids | |
/// the read handlers. | |
/// | |
/// Defaults to `false`. | |
pub needs_data_for_delete: bool, | |
} | |
impl Default for CompactionConfig { | |
fn default() -> Self { | |
Self { | |
scan_interval: Duration::from_secs(30), | |
needs_data_for_delete: false, | |
} | |
} | |
} | |
/// A merge policy tells Yorick how it should compact the storage. | |
/// | |
/// It allows you to effectively mutate the data how ever you like, | |
/// but internally yorick will pair up files based on size, aiming | |
/// to create gradually bigger files. | |
pub trait CompactionPolicy: Send + Sync + 'static { | |
/// Get the current latest compaction config. | |
fn get_config(&self) -> CompactionConfig { | |
CompactionConfig::default() | |
} | |
/// Returns the file key which the merger is allowed to consider compacting files up to. | |
/// | |
/// This means combining smaller files into bigger files. | |
fn get_safe_compact_checkpoint(&self) -> Option<FileKey>; | |
/// Returns if a given blob can be deleted. | |
/// | |
/// No data will be provided if the `needs_data_for_delete` flag is not enabled in the config. | |
/// | |
/// This method is ran in a background thread. | |
fn can_delete( | |
&self, | |
blob_id: BlobId, | |
info: BlobInfo, | |
data: Option<ReadBuffer>, | |
) -> io::Result<bool>; | |
} | |
#[derive(Clone)] | |
/// The compactor kill switch. | |
pub struct CompactorKillSwitch { | |
handle: Arc<Mutex<Option<JoinHandle<()>>>>, | |
tx: tachyonix::Sender<()>, | |
} | |
impl CompactorKillSwitch { | |
/// Shuts the compactor down and waits for it to exit. | |
pub fn shutdown_and_wait(&self) { | |
self.tx.close(); | |
if let Some(handle) = self.handle.lock().take() { | |
let _ = futures_lite::future::block_on(handle); | |
} | |
} | |
/// Shuts the compactor down and waits for it to exit. | |
/// | |
/// This variant can be used from an async context. | |
pub async fn shutdown_and_wait_async(&self) { | |
self.tx.close(); | |
if let Some(handle) = self.handle.lock().take() { | |
let _ = handle.await; | |
} | |
} | |
} | |
/// The manager for compacting blob files into larger files. | |
pub struct BlobCompactor { | |
/// The compaction policy that controls the compactor. | |
policy: Box<dyn CompactionPolicy>, | |
/// The current blob index. | |
/// | |
/// This is scanned to locate potential compaction candidates. | |
blob_index: BlobIndex, | |
/// The storage backend system. | |
backend: StorageBackend, | |
/// The path where the data files are kept. | |
data_path: PathBuf, | |
/// The maximum file size that the compactor should produce. | |
max_file_size: u64, | |
/// The compaction triggers. | |
triggers: tachyonix::Receiver<()>, | |
/// The transmitter for sending triggers, | |
notify: tachyonix::Sender<()>, | |
} | |
impl BlobCompactor { | |
#[instrument("compactor-spawn", skip(policy, blob_index, backend))] | |
/// Spawns a new blob compactor with a given policy. | |
pub(crate) async fn spawn( | |
policy: Box<dyn CompactionPolicy>, | |
blob_index: BlobIndex, | |
data_path: &Path, | |
max_file_size: u64, | |
backend: StorageBackend, | |
) -> CompactorKillSwitch { | |
let (tx, triggers) = tachyonix::channel(10); | |
let actor = Self { | |
policy, | |
blob_index, | |
data_path: data_path.to_path_buf(), | |
max_file_size, | |
backend, | |
triggers, | |
notify: tx.clone(), | |
}; | |
let handle = tokio::spawn(actor.run()); | |
CompactorKillSwitch { | |
tx, | |
handle: Arc::new(Mutex::new(Some(handle))), | |
} | |
} | |
#[instrument("compactor", skip_all)] | |
async fn run(mut self) { | |
self.schedule_compact(); | |
info!("Compactor is ready"); | |
while let Ok(()) = self.triggers.recv().await { | |
let start = Instant::now(); | |
match self.run_compaction().await { | |
Ok(num_bytes) => { | |
info!( | |
reclaimed_bytes = num_bytes, | |
reclaimed_bytes_pretty = %humansize::format_size(num_bytes, humansize::DECIMAL), | |
elapsed = ?start.elapsed(), | |
"Compaction completed" | |
); | |
}, | |
Err(e) => { | |
error!(error = ?e, "Failed to run compaction due to error."); | |
}, | |
} | |
self.schedule_compact(); | |
// Drain any events that we might have collected while merging. | |
while let Ok(()) = self.triggers.try_recv() { | |
continue | |
} | |
} | |
info!("Compactor has shutdown"); | |
} | |
/// Schedules a compaction cycle trigger. | |
fn schedule_compact(&self) { | |
let config = self.policy.get_config(); | |
let tx = self.notify.clone(); | |
tokio::spawn(async move { | |
tokio::time::sleep(config.scan_interval).await; | |
let _ = tx.send(()).await; | |
}); | |
} | |
async fn run_compaction(&mut self) -> io::Result<u64> { | |
let file_key = match self.policy.get_safe_compact_checkpoint() { | |
None => return Ok(0), | |
Some(key) => key, | |
}; | |
let path = self.data_path.clone(); | |
let files = tokio::task::spawn_blocking(move || { | |
get_current_file_sizes_before(&path, file_key) | |
}) | |
.await | |
.expect("Spawn background thread")?; | |
let merge_buckets = self.get_merge_buckets(file_key); | |
info!( | |
num_files = files.len(), | |
"{} files met compaction criteria", | |
files.len() | |
); | |
{ | |
let path = self.data_path.clone(); | |
let merge_buckets = merge_buckets.clone(); | |
let files = files.clone(); | |
tokio::task::spawn_blocking(move || { | |
clean_dead_files(&path, &files, &merge_buckets) | |
}).await.expect("Spawn background thread")?; | |
} | |
Ok(0) | |
} | |
fn get_merge_buckets(&self, before: FileKey) -> Vec<MergeBucket> { | |
let mut buckets = HashMap::new(); | |
let reader = self.blob_index.reader(); | |
let guard = reader.enter(); | |
for (id, info) in guard.iter().flatten() { | |
let info = info.get_one().unwrap(); | |
if info.file_key >= before { | |
continue | |
} | |
let bucket = buckets | |
.entry((info.file_key, info.group_id)) | |
.or_insert_with(|| MergeBucket { | |
group_id: info.group_id, | |
file_key: info.file_key, | |
total_size_approx: 0, | |
blobs: Vec::with_capacity(1), | |
}); | |
bucket.total_size_approx += info.len() as u64; | |
bucket.blobs.push(*id); | |
} | |
// Collect and group the data. | |
let mut buckets: Vec<MergeBucket> = buckets.into_values().collect(); | |
buckets.sort_by_key(|bucket| (bucket.group_id, bucket.total_size_approx)); | |
buckets | |
} | |
} | |
#[derive(Debug, Clone)] | |
/// A single merge operation bucket. | |
struct MergeBucket { | |
/// The group ID of the bucket. | |
group_id: u64, | |
/// The file key of the bucket. | |
file_key: FileKey, | |
/// The approximate size in bytes | |
total_size_approx: u64, | |
/// The blobs that are apart of that file. | |
blobs: Vec<BlobId>, | |
} | |
#[instrument("compactor-file-finder")] | |
/// Creates a list of all files currently in the data directory | |
/// which are produced before the given `FileKey`. | |
fn get_current_file_sizes_before( | |
path: &Path, | |
newest_file_key: FileKey, | |
) -> io::Result<Vec<(FileKey, u64)>> { | |
let dir = path.read_dir()?; | |
let mut files = Vec::new(); | |
for entry in dir { | |
let entry = entry?; | |
let path = entry.path(); | |
let metadata = path.metadata()?; | |
if metadata.is_dir() { | |
warn!(path = %path.display(), "Ignoring unknown folder in data folder"); | |
continue; | |
} | |
let file_key = match crate::tools::parse_data_file_name(&path) { | |
Some(file_key) => file_key, | |
None => { | |
warn!(path = %path.display(), "Ignoring unknown file"); | |
continue; | |
}, | |
}; | |
if file_key < newest_file_key { | |
files.push((file_key, metadata.len())); | |
} | |
} | |
Ok(files) | |
} | |
#[instrument("dead-file-gc", skip(files, buckets))] | |
/// Removes any files which contain no data currently in the index. | |
fn clean_dead_files(data_path: &Path, files: &[(FileKey, u64)], buckets: &[MergeBucket]) -> io::Result<u64> { | |
let mut lookup = HashSet::from_iter(files.iter().map(|v| v.0)); | |
// Remove any files which exist in our index. | |
for bucket in buckets { | |
lookup.remove(&bucket.file_key); | |
} | |
let mut num_bytes_cleaned = 0; | |
for (key, size) in files { | |
if !lookup.contains(key) { | |
continue | |
} | |
let path = get_data_file(&data_path, *key); | |
match std::fs::remove_file(&path) { | |
Err(e) => { | |
warn!(path = %path.display(), error = ?e, "Failed to remove dead file due to error"); | |
continue | |
}, | |
Ok(()) => { | |
num_bytes_cleaned += size; | |
info!(path = %path.display(), "Removed dead file"); | |
}, | |
} | |
} | |
info!( | |
num_dead_files = lookup.len(), | |
reclaimed_bytes = num_bytes_cleaned, | |
reclaimed_bytes_pretty = %humansize::format_size(num_bytes_cleaned, humansize::DECIMAL), | |
"Dead files have been cleaned up" | |
); | |
Ok(0) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment