Created
March 14, 2018 02:42
-
-
Save jonathanstrong/d88d474f5b0ea147e5bc98d9df0910a2 to your computer and use it in GitHub Desktop.
multiqueue bench
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#![feature(duration_from_micros)] | |
extern crate hdrhistogram; | |
extern crate multiqueue; | |
extern crate chrono; | |
extern crate libc; | |
extern crate hwloc; // tested with v0.3 | |
extern crate rand; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::atomic::{AtomicUsize, Ordering}; | |
use std::sync::mpsc::{Sender, Receiver, SendError, channel, TryRecvError, TrySendError}; | |
use std::thread::{self, JoinHandle}; | |
use std::time::{Instant, Duration, SystemTime, UNIX_EPOCH}; | |
use std::{io, fs}; | |
use rand::distributions::{Range, Sample}; | |
use hdrhistogram::Histogram; | |
use hdrhistogram::serialization::{Serializer, V2DeflateSerializer, V2Serializer}; | |
use hdrhistogram::serialization::interval_log::{IntervalLogWriterBuilder, Tag}; | |
use hwloc::{Topology, ObjectType, CPUBIND_THREAD, CpuSet}; | |
use chrono::{DateTime, Utc}; | |
fn get_thread_id() -> libc::pthread_t { | |
unsafe { libc::pthread_self() } | |
} | |
pub fn bind_thread(topo: Arc<Mutex<Topology>>, core_num: usize) { | |
let tid = get_thread_id(); | |
let mut lock = topo.lock().unwrap(); | |
let mut core = | |
lock.objects_with_type(&ObjectType::Core).ok().and_then(|cores| { | |
cores.get(core_num) | |
.and_then(|core| { | |
core.cpuset() | |
}) | |
}).unwrap_or_else(|| { | |
panic!("failed to get core"); | |
}); | |
core.singlify(); | |
lock.set_cpubind_for_thread(tid, core, CPUBIND_THREAD).unwrap_or_else(|e| { | |
panic!("failed to get bind thread to core"); | |
}); | |
} | |
/// This was used to plug many different queue types into the benchmarks | |
trait Puts { | |
type Item; | |
type Err; | |
fn send(&self, Self::Item) -> Result<(), Self::Err>; | |
} | |
impl<T: Clone> Puts for multiqueue::BroadcastSender<T> { | |
type Item = T; | |
type Err = TrySendError<T>; | |
fn send(&self, item: Self::Item) -> Result<(), Self::Err> { | |
self.try_send(item) | |
} | |
} | |
fn sporadic_sender_with_signal<P, I, F>( | |
tx: P, | |
new_item: F, | |
term: Duration, | |
min_delay: u64, | |
max_delay: u64, | |
done: Arc<AtomicUsize>, | |
) -> JoinHandle<()> | |
where P: Puts<Item = I, Err = TrySendError<I>> + Send + 'static, | |
F: Fn() -> I + Send + 'static | |
{ | |
thread::spawn(move || { | |
let start = Instant::now(); | |
let mut rng = rand::thread_rng(); | |
let mut range = Range::new(min_delay, max_delay); | |
let mut spins = 0; | |
while Instant::now() - start < term { | |
let mut item = Some(new_item()); | |
while item.is_some() { | |
item = item.and_then(|item| { | |
match tx.send(item) { | |
Err(TrySendError::Full(item)) => { | |
spins += 1; | |
Some(item) | |
} | |
_ => None | |
} | |
}); | |
} | |
thread::sleep(Duration::from_micros(range.sample(&mut rng))); | |
} | |
drop(tx); | |
done.fetch_add(1, Ordering::Relaxed); | |
if spins > 0 { | |
println!("sporadic sender lifetime spins: {}", spins); | |
} | |
}) | |
} | |
fn hist_log(rx: Receiver<Option<u64>>, freq: Duration, tag: &'static str) -> JoinHandle<()> { | |
thread::spawn(move || { | |
let mut ser = V2DeflateSerializer::new(); | |
let start_time = SystemTime::now(); | |
let seconds = start_time.duration_since(UNIX_EPOCH).unwrap().as_secs(); | |
let file = fs::File::create(&format!("hist-log-{}.v2z", seconds)).unwrap(); | |
let mut buf = io::LineWriter::new(file); | |
let mut hist = Histogram::<u64>::new(3).unwrap(); | |
let mut wtr = | |
IntervalLogWriterBuilder::new() | |
.with_base_time(UNIX_EPOCH) | |
.with_start_time(start_time) | |
.begin_log_with(&mut buf, &mut ser) | |
.unwrap(); | |
let mut last_write = Instant::now(); | |
loop { | |
match rx.recv() { | |
Ok(Some(val)) => { | |
hist.record(val).unwrap(); | |
let now = Instant::now(); | |
if now - last_write > freq { | |
let sys_now = SystemTime::now(); | |
let dur = now - last_write; | |
let start = sys_now - dur; | |
let start = start.duration_since(UNIX_EPOCH).unwrap(); | |
wtr.write_histogram(&hist, start, dur, Tag::new(tag)); | |
hist.clear(); | |
last_write = now; | |
} | |
} | |
Ok(None) => break, // sending None is terminate command | |
_ => { thread::yield_now(); } | |
} | |
} | |
}) | |
} | |
fn multiqueue_broadcast_affinity_bench( | |
runtime: Duration, | |
min_delay: u64, | |
max_delay: u64, | |
n_senders: usize, | |
use_cores: &[usize], | |
) { | |
assert!(n_senders > 0, "n must be > 0"); | |
println!("starting multiqueue broadcast test: {} senders, cores: {:?}", n_senders, use_cores); | |
let topo = Arc::new(Mutex::new(Topology::new())); | |
let n_done = Arc::new(AtomicUsize::new(0)); | |
let mut senders = Vec::new(); | |
let mut receivers = Vec::new(); | |
let (hist_tx, hist_rx) = channel(); | |
let hist_thread = hist_log(hist_rx, Duration::from_secs(1), "multiqueue_broadcast_affinity"); | |
let (tx, rx) = multiqueue::broadcast_queue(8); | |
for &core_num in use_cores.iter() { | |
let hist_tx = hist_tx.clone(); | |
let topo = Arc::clone(&topo); | |
let rx = rx.add_stream(); | |
let n_done = Arc::clone(&n_done); | |
receivers.push(thread::spawn(move || { | |
bind_thread(topo, core_num); | |
let mut count = 0; | |
loop { | |
match rx.try_recv() { | |
Ok(sent_time) => { | |
let now = Utc::now(); | |
let delta = | |
now.signed_duration_since(sent_time) | |
.num_nanoseconds() | |
.unwrap() | |
.max(0i64) | |
as u64; | |
hist_tx.send(Some(delta)); | |
count += 1; | |
} | |
Err(TryRecvError::Empty) => {} | |
Err(TryRecvError::Disconnected) => break, | |
} | |
if n_done.load(Ordering::Relaxed) >= n_senders { break } | |
} | |
println!("{} rx thread (core {}) rcvd {} messages", Utc::now(), core_num, count); | |
})); | |
} | |
rx.unsubscribe(); | |
hist_tx.send(None); | |
for _ in 0..n_senders { | |
senders.push( | |
sporadic_sender_with_signal( | |
tx.clone(), || Utc::now(), runtime, min_delay, max_delay, Arc::clone(&n_done))); | |
} | |
for r in receivers { let _ = r.join(); } | |
} | |
fn main() { | |
multiqueue_broadcast_affinity_bench(Duration::from_secs(30), 0, 10_000, 10, &[0, 1, 2, 3]); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment