Skip to content

Instantly share code, notes, and snippets.

@jonathanstrong
Created March 14, 2018 02:42
Show Gist options
  • Save jonathanstrong/d88d474f5b0ea147e5bc98d9df0910a2 to your computer and use it in GitHub Desktop.
Save jonathanstrong/d88d474f5b0ea147e5bc98d9df0910a2 to your computer and use it in GitHub Desktop.
multiqueue bench
#![feature(duration_from_micros)]
extern crate hdrhistogram;
extern crate multiqueue;
extern crate chrono;
extern crate libc;
extern crate hwloc; // tested with v0.3
extern crate rand;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Sender, Receiver, SendError, channel, TryRecvError, TrySendError};
use std::thread::{self, JoinHandle};
use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH};
use std::{io, fs};
use rand::distributions::{Range, Sample};
use hdrhistogram::Histogram;
use hdrhistogram::serialization::{Serializer, V2DeflateSerializer, V2Serializer};
use hdrhistogram::serialization::interval_log::{IntervalLogWriterBuilder, Tag};
use hwloc::{Topology, ObjectType, CPUBIND_THREAD, CpuSet};
use chrono::{DateTime, Utc};
fn get_thread_id() -> libc::pthread_t {
unsafe { libc::pthread_self() }
}
pub fn bind_thread(topo: Arc<Mutex<Topology>>, core_num: usize) {
let tid = get_thread_id();
let mut lock = topo.lock().unwrap();
let mut core =
lock.objects_with_type(&ObjectType::Core).ok().and_then(|cores| {
cores.get(core_num)
.and_then(|core| {
core.cpuset()
})
}).unwrap_or_else(|| {
panic!("failed to get core");
});
core.singlify();
lock.set_cpubind_for_thread(tid, core, CPUBIND_THREAD).unwrap_or_else(|e| {
panic!("failed to get bind thread to core");
});
}
/// This was used to plug many different queue types into the benchmarks
trait Puts {
type Item;
type Err;
fn send(&self, Self::Item) -> Result<(), Self::Err>;
}
impl<T: Clone> Puts for multiqueue::BroadcastSender<T> {
type Item = T;
type Err = TrySendError<T>;
fn send(&self, item: Self::Item) -> Result<(), Self::Err> {
self.try_send(item)
}
}
fn sporadic_sender_with_signal<P, I, F>(
tx: P,
new_item: F,
term: Duration,
min_delay: u64,
max_delay: u64,
done: Arc<AtomicUsize>,
) -> JoinHandle<()>
where P: Puts<Item = I, Err = TrySendError<I>> + Send + 'static,
F: Fn() -> I + Send + 'static
{
thread::spawn(move || {
let start = Instant::now();
let mut rng = rand::thread_rng();
let mut range = Range::new(min_delay, max_delay);
let mut spins = 0;
while Instant::now() - start < term {
let mut item = Some(new_item());
while item.is_some() {
item = item.and_then(|item| {
match tx.send(item) {
Err(TrySendError::Full(item)) => {
spins += 1;
Some(item)
}
_ => None
}
});
}
thread::sleep(Duration::from_micros(range.sample(&mut rng)));
}
drop(tx);
done.fetch_add(1, Ordering::Relaxed);
if spins > 0 {
println!("sporadic sender lifetime spins: {}", spins);
}
})
}
fn hist_log(rx: Receiver<Option<u64>>, freq: Duration, tag: &'static str) -> JoinHandle<()> {
thread::spawn(move || {
let mut ser = V2DeflateSerializer::new();
let start_time = SystemTime::now();
let seconds = start_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
let file = fs::File::create(&format!("hist-log-{}.v2z", seconds)).unwrap();
let mut buf = io::LineWriter::new(file);
let mut hist = Histogram::<u64>::new(3).unwrap();
let mut wtr =
IntervalLogWriterBuilder::new()
.with_base_time(UNIX_EPOCH)
.with_start_time(start_time)
.begin_log_with(&mut buf, &mut ser)
.unwrap();
let mut last_write = Instant::now();
loop {
match rx.recv() {
Ok(Some(val)) => {
hist.record(val).unwrap();
let now = Instant::now();
if now - last_write > freq {
let sys_now = SystemTime::now();
let dur = now - last_write;
let start = sys_now - dur;
let start = start.duration_since(UNIX_EPOCH).unwrap();
wtr.write_histogram(&hist, start, dur, Tag::new(tag));
hist.clear();
last_write = now;
}
}
Ok(None) => break, // sending None is terminate command
_ => { thread::yield_now(); }
}
}
})
}
fn multiqueue_broadcast_affinity_bench(
runtime: Duration,
min_delay: u64,
max_delay: u64,
n_senders: usize,
use_cores: &[usize],
) {
assert!(n_senders > 0, "n must be > 0");
println!("starting multiqueue broadcast test: {} senders, cores: {:?}", n_senders, use_cores);
let topo = Arc::new(Mutex::new(Topology::new()));
let n_done = Arc::new(AtomicUsize::new(0));
let mut senders = Vec::new();
let mut receivers = Vec::new();
let (hist_tx, hist_rx) = channel();
let hist_thread = hist_log(hist_rx, Duration::from_secs(1), "multiqueue_broadcast_affinity");
let (tx, rx) = multiqueue::broadcast_queue(8);
for &core_num in use_cores.iter() {
let hist_tx = hist_tx.clone();
let topo = Arc::clone(&topo);
let rx = rx.add_stream();
let n_done = Arc::clone(&n_done);
receivers.push(thread::spawn(move || {
bind_thread(topo, core_num);
let mut count = 0;
loop {
match rx.try_recv() {
Ok(sent_time) => {
let now = Utc::now();
let delta =
now.signed_duration_since(sent_time)
.num_nanoseconds()
.unwrap()
.max(0i64)
as u64;
hist_tx.send(Some(delta));
count += 1;
}
Err(TryRecvError::Empty) => {}
Err(TryRecvError::Disconnected) => break,
}
if n_done.load(Ordering::Relaxed) >= n_senders { break }
}
println!("{} rx thread (core {}) rcvd {} messages", Utc::now(), core_num, count);
}));
}
rx.unsubscribe();
hist_tx.send(None);
for _ in 0..n_senders {
senders.push(
sporadic_sender_with_signal(
tx.clone(), || Utc::now(), runtime, min_delay, max_delay, Arc::clone(&n_done)));
}
for r in receivers { let _ = r.join(); }
}
fn main() {
multiqueue_broadcast_affinity_bench(Duration::from_secs(30), 0, 10_000, 10, &[0, 1, 2, 3]);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment