Created
July 9, 2014 18:40
-
-
Save apoelstra/0dd318a5b6d1bea48881 to your computer and use it in GitHub Desktop.
new select.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT | |
// file at the top-level directory of this distribution and at | |
// http://rust-lang.org/COPYRIGHT. | |
// | |
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or | |
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license | |
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your | |
// option. This file may not be copied, modified, or distributed | |
// except according to those terms. | |
//! Selection over an array of receivers | |
//! | |
//! This module contains the implementation machinery necessary for selecting | |
//! over a number of receivers. One large goal of this module is to provide an | |
//! efficient interface to selecting over any receiver of any type. | |
//! | |
//! This is achieved through an architecture of a "receiver set" in which | |
//! receivers are added to a set and then the entire set is waited on at once. | |
//! The set can be waited on multiple times to prevent re-adding each receiver | |
//! to the set. | |
//! | |
//! See https://github.com/rust-lang/rust/issues/12902 for reason for replacement --asp | |
//! | |
use alloc::owned::Box; | |
use core::cell::Cell; | |
use core::kinds::marker; | |
use core::mem; | |
use core::uint; | |
use rustrt::local::Local; | |
use rustrt::task::Task; | |
use sync::comm::Packet; | |
use sync::comm::Receiver; | |
/// The "receiver set" of the select interface. This structure is used to manage | |
/// a set of receivers which are being selected over. | |
pub struct Select<'sel, U> { | |
head: Option<Box<Handle<'sel, (), U>>>, | |
tail: *mut Handle<'sel, (), U>, | |
next_id: Cell<uint>, | |
marker1: marker::NoSend, | |
} | |
/// A handle to a receiver which is currently a member of a `Select` set of | |
/// receivers. This handle is used to keep the receiver in the set as well as | |
/// interact with the underlying receiver. | |
struct Handle<'sel, T, U> { | |
id: uint, | |
next: Option<Box<Handle<'sel, (), U>>>, | |
// due to our fun transmutes, we be sure to place this at the end. (nothing | |
// previous relies on T) | |
callback: |T|: 'sel -> U, | |
rx: Receiver<T>, | |
} | |
impl<'sel, T:Send, U> Handle<'sel, T, U> { | |
fn recv_cb(&mut self, abs_recv: &'sel Receiver<()>) -> U { | |
let recv: &Receiver<T> = unsafe { | |
mem::transmute(abs_recv) | |
}; | |
(self.callback)(recv.recv()) | |
} | |
} | |
/// A handle which is used to receive directly from a `Receiver` owned by a `Select` | |
pub struct RecvHandle<T>(uint); | |
impl<T:Send> RecvHandle<T> { | |
/// Receive data from the underlying `Receiver` | |
pub fn recv_from<'sel, U>(&self, sel: &Select<'sel, U>) -> T { | |
let &RecvHandle(id) = self; | |
for handle in sel.iter() { | |
if unsafe { (*handle).id } == id { | |
let concrete_handle: &Handle<'sel, T, U> = unsafe { mem::transmute(handle) }; | |
return concrete_handle.rx.recv(); | |
} | |
} | |
fail!("Tried to receive from an invalid handle!"); | |
} | |
} | |
struct Packets<'sel, U> { cur: *mut Handle<'sel, (), U> } | |
impl<'sel, U> Select<'sel, U> { | |
/// Creates a new selection structure. This set is initially empty and | |
/// `wait` will fail!() if called. | |
/// | |
/// Usage of this struct directly can sometimes be burdensome, and usage is | |
/// rather much easier through the `select!` macro. | |
pub fn new() -> Select<'sel, U> { | |
Select { | |
marker1: marker::NoSend, | |
head: None, | |
tail: RawPtr::null(), | |
next_id: Cell::new(1), | |
} | |
} | |
/// Creates a new handle into this receiver set for a new receiver. Note | |
/// that this does *not* add the receiver to the receiver set, for that you | |
/// must call the `add` method on the handle itself. | |
pub fn add<T: Send>(&mut self, rx: Receiver<T>, callback: |T|: 'sel -> U) -> RecvHandle<T> { | |
let id = self.next_id.get(); | |
self.next_id.set(id + 1); | |
let new = box Handle { | |
id: id, | |
next: None, | |
rx: rx, | |
callback: callback | |
}; | |
// Drop type information | |
let mut new_abs: Box<Handle<'sel, (), U>> = unsafe { | |
use std::mem::transmute; | |
transmute(new) | |
}; | |
// Install in linked list | |
match self.head { | |
None => { | |
self.tail = &mut *new_abs as *mut _; | |
self.head = Some(new_abs); | |
RecvHandle(self.head.get_ref().id) | |
} | |
Some(_) => { | |
unsafe { | |
(*self.tail).next = Some(new_abs); | |
self.tail = &mut **(*self.tail).next.get_mut_ref() as *mut _; | |
RecvHandle((*self.tail).id) | |
} | |
} | |
} | |
} | |
/// Waits for an event on this receiver set. The returned value is *not* an | |
/// index, but rather an id. This id can be queried against any active | |
/// `Handle` structures (each one has an `id` method). The handle with | |
/// the matching `id` will have some sort of event available on it. The | |
/// event could either be that data is available or the corresponding | |
/// channel has been closed. | |
pub fn recv(&self) -> U { | |
self.recv2(true) | |
} | |
/// Helper method for skipping the preflight checks during testing | |
fn recv2(&self, do_preflight_checks: bool) -> U { | |
// Note that this is currently an inefficient implementation. We in | |
// theory have knowledge about all receivers in the set ahead of time, | |
// so this method shouldn't really have to iterate over all of them yet | |
// again. The idea with this "receiver set" interface is to get the | |
// interface right this time around, and later this implementation can | |
// be optimized. | |
// | |
// This implementation can be summarized by: | |
// | |
// fn select(receivers) { | |
// if any receiver ready { return ready index } | |
// deschedule { | |
// block on all receivers | |
// } | |
// unblock on all receivers | |
// return ready index | |
// } | |
// | |
// Most notably, the iterations over all of the receivers shouldn't be | |
// necessary. | |
unsafe { | |
let mut ret: U = mem::uninitialized(); | |
let mut amt = 0; | |
for p in self.iter() { | |
let packet = &(*p).rx as &Packet; | |
amt += 1; | |
if do_preflight_checks && packet.can_recv() { | |
return (*p).recv_cb(mem::transmute(p)); | |
} | |
} | |
assert!(amt > 0); | |
let mut ready_index = amt; | |
let mut ready_id = uint::MAX; | |
let mut iter = self.iter().enumerate(); | |
// Acquire a number of blocking contexts, and block on each one | |
// sequentially until one fails. If one fails, then abort | |
// immediately so we can go unblock on all the other receivers. | |
let task: Box<Task> = Local::take(); | |
task.deschedule(amt, |task| { | |
// Prepare for the block | |
let (i, handle) = iter.next().unwrap(); | |
let packet = &(*handle).rx as &Packet; | |
match packet.start_selection(task) { | |
Ok(()) => Ok(()), | |
Err(task) => { | |
ready_index = i; | |
Err(task) | |
} | |
} | |
}); | |
// Abort the selection process on each receiver. If the abort | |
// process returns `true`, then that means that the receiver is | |
// ready to receive some data. Note that this also means that the | |
// receiver may have yet to have fully read the `to_wake` field and | |
// woken us up (although the wakeup is guaranteed to fail). | |
// | |
// This situation happens in the window of where a sender invokes | |
// increment(), sees -1, and then decides to wake up the task. After | |
// all this is done, the sending thread will set `selecting` to | |
// `false`. Until this is done, we cannot return. If we were to | |
// return, then a sender could wake up a receiver which has gone | |
// back to sleep after this call to `select`. | |
// | |
// Note that it is a "fairly small window" in which an increment() | |
// views that it should wake a thread up until the `selecting` bit | |
// is set to false. For now, the implementation currently just spins | |
// in a yield loop. This is very distasteful, but this | |
// implementation is already nowhere near what it should ideally be. | |
// A rewrite should focus on avoiding a yield loop, and for now this | |
// implementation is tying us over to a more efficient "don't | |
// iterate over everything every time" implementation. | |
for handle in self.iter().take(ready_index) { | |
let packet = &(*handle).rx as &Packet; | |
if packet.abort_selection() { | |
ready_id = (*handle).id; | |
ret = (*handle).recv_cb(mem::transmute(handle)); | |
} | |
} | |
assert!(ready_id != uint::MAX); | |
return ret; | |
} | |
} | |
fn iter(&self) -> Packets<'sel, U> { | |
Packets { | |
cur: match self.head { | |
None => RawPtr::null(), | |
Some(ref t) => &**t as *const _ as *mut _ | |
} | |
} | |
} | |
} | |
impl<'sel, U> Iterator<*mut Handle<'sel, (), U>> for Packets<'sel, U> { | |
fn next(&mut self) -> Option<*mut Handle<'sel, (), U>> { | |
if self.cur.is_null() { | |
None | |
} else { | |
let ret = self.cur; | |
self.cur = match unsafe { &(*ret).next } { | |
&None => RawPtr::null(), | |
&Some(ref p) => &**p as *const _ as *mut _ | |
}; | |
Some(ret) | |
} | |
} | |
} | |
#[cfg(test)] | |
#[allow(unused_imports)] | |
mod test { | |
use std::prelude::*; | |
use super::super::*; | |
// Don't use the libstd version so we can pull in the right Select structure | |
// (std::comm points at the wrong one) | |
macro_rules! select { | |
( | |
$($name:pat = $rx:ident.$meth:ident() => $code:expr),+ | |
) => ({ | |
use comm::Select; | |
let sel = Select::new(); | |
$( let mut $rx = sel.handle(&$rx); )+ | |
unsafe { | |
$( $rx.add(); )+ | |
} | |
let ret = sel.wait(); | |
$( if ret == $rx.id() { let $name = $rx.$meth(); $code } else )+ | |
{ unreachable!() } | |
}) | |
} | |
test!(fn smoke() { | |
let (tx1, rx1) = channel::<int>(); | |
let (tx2, rx2) = channel::<int>(); | |
tx1.send(1); | |
select! ( | |
foo = rx1.recv() => { assert_eq!(foo, 1); }, | |
_bar = rx2.recv() => { fail!() } | |
) | |
tx2.send(2); | |
select! ( | |
_foo = rx1.recv() => { fail!() }, | |
bar = rx2.recv() => { assert_eq!(bar, 2) } | |
) | |
drop(tx1); | |
select! ( | |
foo = rx1.recv_opt() => { assert_eq!(foo, Err(())); }, | |
_bar = rx2.recv() => { fail!() } | |
) | |
drop(tx2); | |
select! ( | |
bar = rx2.recv_opt() => { assert_eq!(bar, Err(())); } | |
) | |
}) | |
test!(fn smoke2() { | |
let (_tx1, rx1) = channel::<int>(); | |
let (_tx2, rx2) = channel::<int>(); | |
let (_tx3, rx3) = channel::<int>(); | |
let (_tx4, rx4) = channel::<int>(); | |
let (tx5, rx5) = channel::<int>(); | |
tx5.send(4); | |
select! ( | |
_foo = rx1.recv() => { fail!("1") }, | |
_foo = rx2.recv() => { fail!("2") }, | |
_foo = rx3.recv() => { fail!("3") }, | |
_foo = rx4.recv() => { fail!("4") }, | |
foo = rx5.recv() => { assert_eq!(foo, 4); } | |
) | |
}) | |
test!(fn closed() { | |
let (_tx1, rx1) = channel::<int>(); | |
let (tx2, rx2) = channel::<int>(); | |
drop(tx2); | |
select! ( | |
_a1 = rx1.recv_opt() => { fail!() }, | |
a2 = rx2.recv_opt() => { assert_eq!(a2, Err(())); } | |
) | |
}) | |
test!(fn unblocks() { | |
let (tx1, rx1) = channel::<int>(); | |
let (_tx2, rx2) = channel::<int>(); | |
let (tx3, rx3) = channel::<int>(); | |
spawn(proc() { | |
for _ in range(0u, 20) { task::deschedule(); } | |
tx1.send(1); | |
rx3.recv(); | |
for _ in range(0u, 20) { task::deschedule(); } | |
}); | |
select! ( | |
a = rx1.recv() => { assert_eq!(a, 1); }, | |
_b = rx2.recv() => { fail!() } | |
) | |
tx3.send(1); | |
select! ( | |
a = rx1.recv_opt() => { assert_eq!(a, Err(())); }, | |
_b = rx2.recv() => { fail!() } | |
) | |
}) | |
test!(fn both_ready() { | |
let (tx1, rx1) = channel::<int>(); | |
let (tx2, rx2) = channel::<int>(); | |
let (tx3, rx3) = channel::<()>(); | |
spawn(proc() { | |
for _ in range(0u, 20) { task::deschedule(); } | |
tx1.send(1); | |
tx2.send(2); | |
rx3.recv(); | |
}); | |
select! ( | |
a = rx1.recv() => { assert_eq!(a, 1); }, | |
a = rx2.recv() => { assert_eq!(a, 2); } | |
) | |
select! ( | |
a = rx1.recv() => { assert_eq!(a, 1); }, | |
a = rx2.recv() => { assert_eq!(a, 2); } | |
) | |
assert_eq!(rx1.try_recv(), Err(Empty)); | |
assert_eq!(rx2.try_recv(), Err(Empty)); | |
tx3.send(()); | |
}) | |
test!(fn stress() { | |
static AMT: int = 10000; | |
let (tx1, rx1) = channel::<int>(); | |
let (tx2, rx2) = channel::<int>(); | |
let (tx3, rx3) = channel::<()>(); | |
spawn(proc() { | |
for i in range(0, AMT) { | |
if i % 2 == 0 { | |
tx1.send(i); | |
} else { | |
tx2.send(i); | |
} | |
rx3.recv(); | |
} | |
}); | |
for i in range(0, AMT) { | |
select! ( | |
i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1); }, | |
i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2); } | |
) | |
tx3.send(()); | |
} | |
}) | |
test!(fn cloning() { | |
let (tx1, rx1) = channel::<int>(); | |
let (_tx2, rx2) = channel::<int>(); | |
let (tx3, rx3) = channel::<()>(); | |
spawn(proc() { | |
rx3.recv(); | |
tx1.clone(); | |
assert_eq!(rx3.try_recv(), Err(Empty)); | |
tx1.send(2); | |
rx3.recv(); | |
}); | |
tx3.send(()); | |
select!( | |
_i1 = rx1.recv() => {}, | |
_i2 = rx2.recv() => fail!() | |
) | |
tx3.send(()); | |
}) | |
test!(fn cloning2() { | |
let (tx1, rx1) = channel::<int>(); | |
let (_tx2, rx2) = channel::<int>(); | |
let (tx3, rx3) = channel::<()>(); | |
spawn(proc() { | |
rx3.recv(); | |
tx1.clone(); | |
assert_eq!(rx3.try_recv(), Err(Empty)); | |
tx1.send(2); | |
rx3.recv(); | |
}); | |
tx3.send(()); | |
select!( | |
_i1 = rx1.recv() => {}, | |
_i2 = rx2.recv() => fail!() | |
) | |
tx3.send(()); | |
}) | |
test!(fn cloning3() { | |
let (tx1, rx1) = channel::<()>(); | |
let (tx2, rx2) = channel::<()>(); | |
let (tx3, rx3) = channel::<()>(); | |
spawn(proc() { | |
let s = Select::new(); | |
let mut h1 = s.handle(&rx1); | |
let mut h2 = s.handle(&rx2); | |
unsafe { h2.add(); } | |
unsafe { h1.add(); } | |
assert_eq!(s.wait(), h2.id); | |
tx3.send(()); | |
}); | |
for _ in range(0u, 1000) { task::deschedule(); } | |
drop(tx1.clone()); | |
tx2.send(()); | |
rx3.recv(); | |
}) | |
test!(fn preflight1() { | |
let (tx, rx) = channel(); | |
tx.send(()); | |
select!( | |
() = rx.recv() => {} | |
) | |
}) | |
test!(fn preflight2() { | |
let (tx, rx) = channel(); | |
tx.send(()); | |
tx.send(()); | |
select!( | |
() = rx.recv() => {} | |
) | |
}) | |
test!(fn preflight3() { | |
let (tx, rx) = channel(); | |
drop(tx.clone()); | |
tx.send(()); | |
select!( | |
() = rx.recv() => {} | |
) | |
}) | |
test!(fn preflight4() { | |
let (tx, rx) = channel(); | |
tx.send(()); | |
let s = Select::new(); | |
let mut h = s.handle(&rx); | |
unsafe { h.add(); } | |
assert_eq!(s.wait2(false), h.id); | |
}) | |
test!(fn preflight5() { | |
let (tx, rx) = channel(); | |
tx.send(()); | |
tx.send(()); | |
let s = Select::new(); | |
let mut h = s.handle(&rx); | |
unsafe { h.add(); } | |
assert_eq!(s.wait2(false), h.id); | |
}) | |
test!(fn preflight6() { | |
let (tx, rx) = channel(); | |
drop(tx.clone()); | |
tx.send(()); | |
let s = Select::new(); | |
let mut h = s.handle(&rx); | |
unsafe { h.add(); } | |
assert_eq!(s.wait2(false), h.id); | |
}) | |
test!(fn preflight7() { | |
let (tx, rx) = channel::<()>(); | |
drop(tx); | |
let s = Select::new(); | |
let mut h = s.handle(&rx); | |
unsafe { h.add(); } | |
assert_eq!(s.wait2(false), h.id); | |
}) | |
test!(fn preflight8() { | |
let (tx, rx) = channel(); | |
tx.send(()); | |
drop(tx); | |
rx.recv(); | |
let s = Select::new(); | |
let mut h = s.handle(&rx); | |
unsafe { h.add(); } | |
assert_eq!(s.wait2(false), h.id); | |
}) | |
test!(fn preflight9() { | |
let (tx, rx) = channel(); | |
drop(tx.clone()); | |
tx.send(()); | |
drop(tx); | |
rx.recv(); | |
let s = Select::new(); | |
let mut h = s.handle(&rx); | |
unsafe { h.add(); } | |
assert_eq!(s.wait2(false), h.id); | |
}) | |
test!(fn oneshot_data_waiting() { | |
let (tx1, rx1) = channel(); | |
let (tx2, rx2) = channel(); | |
spawn(proc() { | |
select! { | |
() = rx1.recv() => {} | |
} | |
tx2.send(()); | |
}); | |
for _ in range(0u, 100) { task::deschedule() } | |
tx1.send(()); | |
rx2.recv(); | |
}) | |
test!(fn stream_data_waiting() { | |
let (tx1, rx1) = channel(); | |
let (tx2, rx2) = channel(); | |
tx1.send(()); | |
tx1.send(()); | |
rx1.recv(); | |
rx1.recv(); | |
spawn(proc() { | |
select! { | |
() = rx1.recv() => {} | |
} | |
tx2.send(()); | |
}); | |
for _ in range(0u, 100) { task::deschedule() } | |
tx1.send(()); | |
rx2.recv(); | |
}) | |
test!(fn shared_data_waiting() { | |
let (tx1, rx1) = channel(); | |
let (tx2, rx2) = channel(); | |
drop(tx1.clone()); | |
tx1.send(()); | |
rx1.recv(); | |
spawn(proc() { | |
select! { | |
() = rx1.recv() => {} | |
} | |
tx2.send(()); | |
}); | |
for _ in range(0u, 100) { task::deschedule() } | |
tx1.send(()); | |
rx2.recv(); | |
}) | |
test!(fn sync1() { | |
let (tx, rx) = sync_channel::<int>(1); | |
tx.send(1); | |
select! { | |
n = rx.recv() => { assert_eq!(n, 1); } | |
} | |
}) | |
test!(fn sync2() { | |
let (tx, rx) = sync_channel::<int>(0); | |
spawn(proc() { | |
for _ in range(0u, 100) { task::deschedule() } | |
tx.send(1); | |
}); | |
select! { | |
n = rx.recv() => { assert_eq!(n, 1); } | |
} | |
}) | |
test!(fn sync3() { | |
let (tx1, rx1) = sync_channel::<int>(0); | |
let (tx2, rx2): (Sender<int>, Receiver<int>) = channel(); | |
spawn(proc() { tx1.send(1); }); | |
spawn(proc() { tx2.send(2); }); | |
select! { | |
n = rx1.recv() => { | |
assert_eq!(n, 1); | |
assert_eq!(rx2.recv(), 2); | |
}, | |
n = rx2.recv() => { | |
assert_eq!(n, 2); | |
assert_eq!(rx1.recv(), 1); | |
} | |
} | |
}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment