Skip to content

Instantly share code, notes, and snippets.

@ChillFish8
Created August 11, 2023 10:52
Show Gist options
  • Save ChillFish8/607194aa55cab3dc214a97a12d610e0d to your computer and use it in GitHub Desktop.
Save ChillFish8/607194aa55cab3dc214a97a12d610e0d to your computer and use it in GitHub Desktop.
Fun
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use ahash::{HashMap, HashMapExt, HashSet};
use parking_lot::Mutex;
use tokio::task::JoinHandle;
use crate::backends::ReadBuffer;
use crate::{BlobId, BlobIndex, BlobInfo, FileKey, get_data_file, StorageBackend};
#[derive(Debug, Copy, Clone)]
/// The configuration of the compaction system.
pub struct CompactionConfig {
/// The duration between scans being scheduled.
///
/// You may want to increase/decrease the duration depending on
/// your ingestion rate, as compactions are incremental
/// and can require multiple passes.
///
/// Defaults to `30s`.
pub scan_interval: Duration,
/// Returns if the compaction policy needs to read
/// the blob data in order to consider a blob for deletion.
///
/// Setting this to `false` can save IO bandwidth as it avoids
/// the read handlers.
///
/// Defaults to `false`.
pub needs_data_for_delete: bool,
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
scan_interval: Duration::from_secs(30),
needs_data_for_delete: false,
}
}
}
/// A merge policy tells Yorick how it should compact the storage.
///
/// It allows you to effectively mutate the data how ever you like,
/// but internally yorick will pair up files based on size, aiming
/// to create gradually bigger files.
pub trait CompactionPolicy: Send + Sync + 'static {
/// Get the current latest compaction config.
fn get_config(&self) -> CompactionConfig {
CompactionConfig::default()
}
/// Returns the file key which the merger is allowed to consider compacting files up to.
///
/// This means combining smaller files into bigger files.
fn get_safe_compact_checkpoint(&self) -> Option<FileKey>;
/// Returns if a given blob can be deleted.
///
/// No data will be provided if the `needs_data_for_delete` flag is not enabled in the config.
///
/// This method is ran in a background thread.
fn can_delete(
&self,
blob_id: BlobId,
info: BlobInfo,
data: Option<ReadBuffer>,
) -> io::Result<bool>;
}
#[derive(Clone)]
/// The compactor kill switch.
pub struct CompactorKillSwitch {
handle: Arc<Mutex<Option<JoinHandle<()>>>>,
tx: tachyonix::Sender<()>,
}
impl CompactorKillSwitch {
/// Shuts the compactor down and waits for it to exit.
pub fn shutdown_and_wait(&self) {
self.tx.close();
if let Some(handle) = self.handle.lock().take() {
let _ = futures_lite::future::block_on(handle);
}
}
/// Shuts the compactor down and waits for it to exit.
///
/// This variant can be used from an async context.
pub async fn shutdown_and_wait_async(&self) {
self.tx.close();
if let Some(handle) = self.handle.lock().take() {
let _ = handle.await;
}
}
}
/// The manager for compacting blob files into larger files.
pub struct BlobCompactor {
/// The compaction policy that controls the compactor.
policy: Box<dyn CompactionPolicy>,
/// The current blob index.
///
/// This is scanned to locate potential compaction candidates.
blob_index: BlobIndex,
/// The storage backend system.
backend: StorageBackend,
/// The path where the data files are kept.
data_path: PathBuf,
/// The maximum file size that the compactor should produce.
max_file_size: u64,
/// The compaction triggers.
triggers: tachyonix::Receiver<()>,
/// The transmitter for sending triggers,
notify: tachyonix::Sender<()>,
}
impl BlobCompactor {
#[instrument("compactor-spawn", skip(policy, blob_index, backend))]
/// Spawns a new blob compactor with a given policy.
pub(crate) async fn spawn(
policy: Box<dyn CompactionPolicy>,
blob_index: BlobIndex,
data_path: &Path,
max_file_size: u64,
backend: StorageBackend,
) -> CompactorKillSwitch {
let (tx, triggers) = tachyonix::channel(10);
let actor = Self {
policy,
blob_index,
data_path: data_path.to_path_buf(),
max_file_size,
backend,
triggers,
notify: tx.clone(),
};
let handle = tokio::spawn(actor.run());
CompactorKillSwitch {
tx,
handle: Arc::new(Mutex::new(Some(handle))),
}
}
#[instrument("compactor", skip_all)]
async fn run(mut self) {
self.schedule_compact();
info!("Compactor is ready");
while let Ok(()) = self.triggers.recv().await {
let start = Instant::now();
match self.run_compaction().await {
Ok(num_bytes) => {
info!(
reclaimed_bytes = num_bytes,
reclaimed_bytes_pretty = %humansize::format_size(num_bytes, humansize::DECIMAL),
elapsed = ?start.elapsed(),
"Compaction completed"
);
},
Err(e) => {
error!(error = ?e, "Failed to run compaction due to error.");
},
}
self.schedule_compact();
// Drain any events that we might have collected while merging.
while let Ok(()) = self.triggers.try_recv() {
continue
}
}
info!("Compactor has shutdown");
}
/// Schedules a compaction cycle trigger.
fn schedule_compact(&self) {
let config = self.policy.get_config();
let tx = self.notify.clone();
tokio::spawn(async move {
tokio::time::sleep(config.scan_interval).await;
let _ = tx.send(()).await;
});
}
async fn run_compaction(&mut self) -> io::Result<u64> {
let file_key = match self.policy.get_safe_compact_checkpoint() {
None => return Ok(0),
Some(key) => key,
};
let path = self.data_path.clone();
let files = tokio::task::spawn_blocking(move || {
get_current_file_sizes_before(&path, file_key)
})
.await
.expect("Spawn background thread")?;
let merge_buckets = self.get_merge_buckets(file_key);
info!(
num_files = files.len(),
"{} files met compaction criteria",
files.len()
);
{
let path = self.data_path.clone();
let merge_buckets = merge_buckets.clone();
let files = files.clone();
tokio::task::spawn_blocking(move || {
clean_dead_files(&path, &files, &merge_buckets)
}).await.expect("Spawn background thread")?;
}
Ok(0)
}
fn get_merge_buckets(&self, before: FileKey) -> Vec<MergeBucket> {
let mut buckets = HashMap::new();
let reader = self.blob_index.reader();
let guard = reader.enter();
for (id, info) in guard.iter().flatten() {
let info = info.get_one().unwrap();
if info.file_key >= before {
continue
}
let bucket = buckets
.entry((info.file_key, info.group_id))
.or_insert_with(|| MergeBucket {
group_id: info.group_id,
file_key: info.file_key,
total_size_approx: 0,
blobs: Vec::with_capacity(1),
});
bucket.total_size_approx += info.len() as u64;
bucket.blobs.push(*id);
}
// Collect and group the data.
let mut buckets: Vec<MergeBucket> = buckets.into_values().collect();
buckets.sort_by_key(|bucket| (bucket.group_id, bucket.total_size_approx));
buckets
}
}
#[derive(Debug, Clone)]
/// A single merge operation bucket.
struct MergeBucket {
/// The group ID of the bucket.
group_id: u64,
/// The file key of the bucket.
file_key: FileKey,
/// The approximate size in bytes
total_size_approx: u64,
/// The blobs that are apart of that file.
blobs: Vec<BlobId>,
}
#[instrument("compactor-file-finder")]
/// Creates a list of all files currently in the data directory
/// which are produced before the given `FileKey`.
fn get_current_file_sizes_before(
path: &Path,
newest_file_key: FileKey,
) -> io::Result<Vec<(FileKey, u64)>> {
let dir = path.read_dir()?;
let mut files = Vec::new();
for entry in dir {
let entry = entry?;
let path = entry.path();
let metadata = path.metadata()?;
if metadata.is_dir() {
warn!(path = %path.display(), "Ignoring unknown folder in data folder");
continue;
}
let file_key = match crate::tools::parse_data_file_name(&path) {
Some(file_key) => file_key,
None => {
warn!(path = %path.display(), "Ignoring unknown file");
continue;
},
};
if file_key < newest_file_key {
files.push((file_key, metadata.len()));
}
}
Ok(files)
}
#[instrument("dead-file-gc", skip(files, buckets))]
/// Removes any files which contain no data currently in the index.
fn clean_dead_files(data_path: &Path, files: &[(FileKey, u64)], buckets: &[MergeBucket]) -> io::Result<u64> {
let mut lookup = HashSet::from_iter(files.iter().map(|v| v.0));
// Remove any files which exist in our index.
for bucket in buckets {
lookup.remove(&bucket.file_key);
}
let mut num_bytes_cleaned = 0;
for (key, size) in files {
if !lookup.contains(key) {
continue
}
let path = get_data_file(&data_path, *key);
match std::fs::remove_file(&path) {
Err(e) => {
warn!(path = %path.display(), error = ?e, "Failed to remove dead file due to error");
continue
},
Ok(()) => {
num_bytes_cleaned += size;
info!(path = %path.display(), "Removed dead file");
},
}
}
info!(
num_dead_files = lookup.len(),
reclaimed_bytes = num_bytes_cleaned,
reclaimed_bytes_pretty = %humansize::format_size(num_bytes_cleaned, humansize::DECIMAL),
"Dead files have been cleaned up"
);
Ok(0)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment