Skip to content

Instantly share code, notes, and snippets.

@xnuter
Last active November 17, 2020 04:34
Show Gist options
  • Save xnuter/1b7c50b7cd24b3c6445551c073ae0840 to your computer and use it in GitHub Desktop.
Save xnuter/1b7c50b7cd24b3c6445551c073ae0840 to your computer and use it in GitHub Desktop.
/// Model multi-thread environment, where each threads can handle
/// a single connection at a time.
async fn sync_execution(
n_workers: usize,
latency_distribution: &[u64],
n_jobs: usize,
rate_limiter: LeakyBucket,
) -> Vec<TaskStats> {
let mut threads = Vec::with_capacity(n_workers);
// partitioning to reduce contention
let queue = vec![
crossbeam::channel::bounded::<Task>(n_jobs),
crossbeam::channel::bounded::<Task>(n_jobs),
crossbeam::channel::bounded::<Task>(n_jobs),
crossbeam::channel::bounded::<Task>(n_jobs),
crossbeam::channel::bounded::<Task>(n_jobs),
crossbeam::channel::bounded::<Task>(n_jobs),
crossbeam::channel::bounded::<Task>(n_jobs),
];
static TASK_COUNTER: AtomicUsize = AtomicUsize::new(0);
for i in 0..n_workers {
let receiver = queue[i % queue.len()].1.clone();
threads.push(thread::spawn(move || {
let mut thread_stats = vec![];
for val in receiver {
sleep(Duration::from_millis(val.cost));
// report metrics
let now = Instant::now();
let stats = TaskStats {
start_time: val.start,
success: val.cost < TIMEOUT.as_millis() as u64,
completion_time: now,
overhead: now.duration_since(val.start).as_secs_f64() - val.cost as f64 / 1000.,
};
thread_stats.push(stats);
TASK_COUNTER.fetch_add(1, Ordering::Relaxed);
}
thread_stats
}));
}
println!("Starting sending tasks...");
for i in 0..n_jobs {
rate_limiter.acquire_one().await.unwrap_or_default();
let cost = latency_distribution[i % latency_distribution.len()];
let now = Instant::now();
// round-robin balancing, for simplicity
// in a real app it should be more sophisticated than this
queue[i % queue.len()]
.0
.send(Task { start: now, cost })
.unwrap();
}
println!("Waiting for completion...");
while TASK_COUNTER.load(Ordering::Relaxed) < n_jobs {
sleep(Duration::from_secs(1));
}
for q in queue {
drop(q.0);
}
let mut combined_stats = vec![];
for t in threads {
let thread_stats = t.join().unwrap();
combined_stats.extend(thread_stats);
}
combined_stats
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment