Last active
November 17, 2020 04:34
-
-
Save xnuter/1b7c50b7cd24b3c6445551c073ae0840 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// Model multi-thread environment, where each threads can handle | |
/// a single connection at a time. | |
async fn sync_execution( | |
n_workers: usize, | |
latency_distribution: &[u64], | |
n_jobs: usize, | |
rate_limiter: LeakyBucket, | |
) -> Vec<TaskStats> { | |
let mut threads = Vec::with_capacity(n_workers); | |
// partitioning to reduce contention | |
let queue = vec![ | |
crossbeam::channel::bounded::<Task>(n_jobs), | |
crossbeam::channel::bounded::<Task>(n_jobs), | |
crossbeam::channel::bounded::<Task>(n_jobs), | |
crossbeam::channel::bounded::<Task>(n_jobs), | |
crossbeam::channel::bounded::<Task>(n_jobs), | |
crossbeam::channel::bounded::<Task>(n_jobs), | |
crossbeam::channel::bounded::<Task>(n_jobs), | |
]; | |
static TASK_COUNTER: AtomicUsize = AtomicUsize::new(0); | |
for i in 0..n_workers { | |
let receiver = queue[i % queue.len()].1.clone(); | |
threads.push(thread::spawn(move || { | |
let mut thread_stats = vec![]; | |
for val in receiver { | |
sleep(Duration::from_millis(val.cost)); | |
// report metrics | |
let now = Instant::now(); | |
let stats = TaskStats { | |
start_time: val.start, | |
success: val.cost < TIMEOUT.as_millis() as u64, | |
completion_time: now, | |
overhead: now.duration_since(val.start).as_secs_f64() - val.cost as f64 / 1000., | |
}; | |
thread_stats.push(stats); | |
TASK_COUNTER.fetch_add(1, Ordering::Relaxed); | |
} | |
thread_stats | |
})); | |
} | |
println!("Starting sending tasks..."); | |
for i in 0..n_jobs { | |
rate_limiter.acquire_one().await.unwrap_or_default(); | |
let cost = latency_distribution[i % latency_distribution.len()]; | |
let now = Instant::now(); | |
// round-robin balancing, for simplicity | |
// in a real app it should be more sophisticated than this | |
queue[i % queue.len()] | |
.0 | |
.send(Task { start: now, cost }) | |
.unwrap(); | |
} | |
println!("Waiting for completion..."); | |
while TASK_COUNTER.load(Ordering::Relaxed) < n_jobs { | |
sleep(Duration::from_secs(1)); | |
} | |
for q in queue { | |
drop(q.0); | |
} | |
let mut combined_stats = vec![]; | |
for t in threads { | |
let thread_stats = t.join().unwrap(); | |
combined_stats.extend(thread_stats); | |
} | |
combined_stats | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment