Skip to content

Instantly share code, notes, and snippets.

@toksdotdev
Created March 11, 2023 00:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save toksdotdev/5c2f2e8f808d29dea75facfa3308ab64 to your computer and use it in GitHub Desktop.
Save toksdotdev/5c2f2e8f808d29dea75facfa3308ab64 to your computer and use it in GitHub Desktop.
use super::RoundRobinStrategy;
use crate::balancer::StrategyProducer;
use crate::Target;
use futures::future::abortable;
use futures::stream::AbortHandle;
use futures::stream::Abortable;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tokio::time;
#[async_trait::async_trait]
pub trait Rank {
type Item;
/// Rank values within self based on a property.
async fn rank(&mut self);
}
#[async_trait::async_trait]
impl Rank for Vec<Target> {
type Item = Target;
async fn rank(&mut self) {
todo!()
}
}
pub struct OptimalRoundRobinProducer<T> {
left_bucket: Arc<Mutex<T>>,
right_bucket: Arc<Mutex<T>>,
left_bucket_start_index: Arc<AtomicUsize>,
right_bucket_start_index: Arc<AtomicUsize>,
/// Toggle to switch from left bucket to right bucket.
///
/// Setting the toggle to true causes values to be picked from
/// the left bucket.
read_from_left_bucket: Arc<AtomicBool>,
handle: (Abortable<JoinHandle<()>>, AbortHandle),
}
impl<T> OptimalRoundRobinProducer<T>
where
T: Rank + Send + Sync + Clone + 'static,
{
pub fn new(haystack: T) -> Self {
let left_bucket = Arc::new(Mutex::new(haystack.clone()));
let right_bucket = Arc::new(Mutex::new(haystack));
let left_bucket_start_index = Arc::new(AtomicUsize::new(0));
let right_bucket_start_index = Arc::new(AtomicUsize::new(0));
let read_from_left_bucket = Arc::new(AtomicBool::new(true));
let handle = abortable(tokio::spawn(Self::periodically_arrange(
left_bucket.clone(),
right_bucket.clone(),
left_bucket_start_index.clone(),
right_bucket_start_index.clone(),
read_from_left_bucket.clone(),
)));
Self {
left_bucket_start_index,
right_bucket_start_index,
read_from_left_bucket,
left_bucket,
right_bucket,
handle,
}
}
fn get_bucket(&self) -> (T, Arc<AtomicUsize>) {
if self.read_from_left_bucket.load(Ordering::SeqCst) {
(
self.left_bucket.blocking_lock().clone(),
self.left_bucket_start_index.clone(),
)
} else {
(
self.right_bucket.blocking_lock().clone(),
self.right_bucket_start_index.clone(),
)
}
}
async fn periodically_arrange(
left_bucket: Arc<Mutex<T>>,
right_bucket: Arc<Mutex<T>>,
left_bucket_start_index: Arc<AtomicUsize>,
right_bucket_start_index: Arc<AtomicUsize>,
read_from_left_bucket: Arc<AtomicBool>,
) {
// TODO: make this interval confifurable.
let mut interval = time::interval(Duration::from_secs(10));
loop {
// if false is returned, read_from_left_bucket becomes true.
if read_from_left_bucket.fetch_xor(true, Ordering::SeqCst) {
right_bucket_start_index.store(0, Ordering::SeqCst);
left_bucket.lock().await.rank().await;
} else {
left_bucket_start_index.store(0, Ordering::SeqCst);
right_bucket.lock().await.rank().await;
}
interval.tick().await;
}
}
}
impl StrategyProducer<Target> for OptimalRoundRobinProducer<Vec<Target>> {
fn produce(&self) -> Box<dyn crate::balancer::Strategy<Item = Target>> {
let (bucket, start_index) = self.get_bucket();
let _ = start_index.compare_exchange(bucket.len(), 0, Ordering::SeqCst, Ordering::SeqCst);
Box::new(RoundRobinStrategy::new(
start_index.fetch_add(1, Ordering::SeqCst),
Arc::new(bucket),
))
}
}
impl<T> Drop for OptimalRoundRobinProducer<T> {
fn drop(&mut self) {
self.handle.1.abort();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment