Created
January 26, 2013 19:53
-
-
Save 14427/4644206 to your computer and use it in GitHub Desktop.
Signal merge test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use either::*; | |
use pipes::*; | |
use task::spawn; | |
pub trait Clone { | |
fn clone(&self) -> self; | |
} | |
impl<T: Copy> T: Clone { | |
fn clone(&self) -> T { | |
copy *self | |
} | |
} | |
pub struct Signal<T: Clone Owned> { | |
priv update: Chan< Chan<T> >, | |
} | |
impl<T: Clone Owned> Signal<T> { | |
static fn new(ch: Chan<Chan<T>>) -> Signal<T> { | |
Signal { update: ch } | |
} | |
fn add_chan(&self, ch: Chan<T>) { | |
error!("Adding channel"); | |
self.update.send(ch); | |
error!("Added channel"); | |
} | |
fn lift<U: Clone Owned>(&self, f: ~fn(T) -> U) -> Signal<U> { | |
lift(self, f) | |
} | |
} | |
#[inline(always)] | |
pub fn signal_loop<T: Clone Owned, U: Clone Owned>( | |
default: U, | |
update: Port<T>, | |
new_client: Port<Chan<U>>, | |
process: ~fn(T, U) -> U, | |
filter: ~fn(&T) -> bool) | |
{ | |
error!("Signal loop"); | |
do spawn { | |
error!("In loop task"); | |
let mut chans: ~[Chan<U>] = ~[]; | |
let mut value = default.clone(); | |
let mut update_open = true; | |
let mut client_open = true; | |
loop { | |
error!("In loop"); | |
if client_open && update_open { | |
match select2i(&update, &new_client) { | |
Left(()) => { | |
let opt_tmp = update.try_recv(); | |
match opt_tmp { | |
Some(tmp) => { | |
if filter(&tmp) { | |
value = process(tmp, value); | |
for chans.each |c| { | |
c.send( value.clone() ); | |
} | |
} | |
}, | |
None => update_open = false, | |
} | |
}, | |
Right(()) => { | |
let opt_ch: Option<Chan<U>> = new_client.try_recv(); | |
match opt_ch { | |
Some(ch) => { | |
ch.send( value.clone() ); | |
chans.push(ch); | |
}, | |
None => client_open = false, | |
} | |
}, | |
} | |
} else if update_open { | |
let opt_tmp = update.try_recv(); | |
match opt_tmp { | |
Some(tmp) => { | |
if filter(&tmp) { | |
value = process(tmp, value); | |
for chans.each |c| { | |
c.send( value.clone() ); | |
} | |
} | |
}, | |
None => update_open = false, | |
} | |
} else if client_open { | |
let opt_ch: Option<Chan<U>> = new_client.try_recv(); | |
match opt_ch { | |
Some(ch) => { | |
ch.send( value.clone() ); | |
chans.push(ch); | |
}, | |
None => client_open = false, | |
} | |
} else { | |
break | |
} | |
} | |
io::println("Exit Signal loop"); | |
} | |
} | |
#[inline(always)] | |
pub fn lift<T: Clone Owned, U: Clone Owned>(signal: &Signal<T>, f: ~fn(T) -> U) -> Signal<U> { | |
error!("Lifting..."); | |
let (update, chan) = pipes::stream(); | |
let (client_port, client_chan) = pipes::stream(); | |
error!("hello?"); | |
signal.add_chan(chan); | |
error!("debug 2"); | |
let initial = match update.try_recv() { | |
Some(value) => f( value ), | |
None => fail ~"Signal must have initial value", | |
}; | |
error!("debug 3"); | |
signal_loop(initial, update, client_port, |x, _| f(x), |_| true); | |
error!("Lifted"); | |
Signal::new(client_chan) | |
} | |
#[inline(always)] | |
pub fn constant<T: Clone Owned>(value: T) -> Signal<T> { | |
let (port, chan) = pipes::stream(); | |
do spawn { | |
loop { | |
let client: Option<Chan<T>> = port.try_recv(); | |
match client { | |
Some(ch) => ch.send( value.clone() ), | |
None => break, | |
} | |
} | |
io::println("Exiting constant"); | |
} | |
Signal::new(chan) | |
} | |
/*#[doc(hidden)] | |
extern mod rustrt { | |
#[rust_stack] | |
unsafe fn rust_get_task() -> *rust_task; | |
}*/ | |
#[inline(always)] | |
pub fn merge2<T: Clone Owned, U: Clone Owned>(one: &Signal<T>, two: &Signal<U>) -> Signal<(T, U)> { | |
let (port, chan) = pipes::stream(); | |
let (update1, client1) = pipes::stream(); | |
let (update2, client2) = pipes::stream(); | |
one.add_chan(client1); | |
two.add_chan(client2); | |
do spawn { | |
let mut chans: ~[Chan<(T, U)>] = ~[]; | |
let mut last1 = update1.recv(); | |
let mut last2 = update2.recv(); | |
let mut push: bool; | |
let mut u1_open = true; | |
let mut u2_open = true; | |
let mut client_open = true; | |
let header1 = PacketHeader(); | |
let header2 = PacketHeader(); | |
let header3 = PacketHeader(); | |
let mut ports = ~[update1.header(), update2.header(), port.header()]; | |
while u1_open || u2_open || client_open { | |
push = false; | |
error!("Loop: %?, %?, %?", u1_open, u2_open, client_open); | |
match selecti( ports ) { | |
0 => { | |
error!("Left"); | |
match update1.try_recv() { | |
Some(value) => { | |
last1 = value; | |
push = true; | |
} | |
None => { | |
error!("Left sender is closed"); | |
u1_open = false; | |
ports[0] = &header1; | |
} | |
} | |
} | |
1 => { | |
error!("Right"); | |
match update2.try_recv() { | |
Some(value) => { | |
last2 = value; | |
push = true; | |
} | |
None => { | |
error!("Right sender is closed"); | |
u2_open = false; | |
ports[1] = &header2; | |
} | |
} | |
} | |
2 => { | |
error!("New Client"); | |
match port.try_recv() { | |
Some(ch) => { | |
let ch: Chan<(T, U)> = ch; | |
let value = (last1.clone(), last2.clone()); | |
ch.send( value ); | |
chans.push( ch ); | |
} | |
None => { | |
error!("Controller is closed"); | |
client_open = false; | |
ports[2] = &header3; | |
} | |
} | |
} | |
_ => fail ~"wut?", | |
} | |
error!("Pushing?: %?", push); | |
if push { | |
for chans.each |ch| { | |
let value = (last1.clone(), last2.clone()); | |
if !ch.try_send( value ) { | |
fail ~"This need to get fixed"; | |
} | |
} | |
} | |
} | |
error!("Exiting merge"); | |
} | |
Signal::new(chan) | |
} | |
fn main() { | |
let s1 = &constant(5); | |
let s2 = &constant(3); | |
let signal = merge2(s1, s2); | |
signal.lift(|(a, b)| error!("Lifting: %?", a+b )); | |
//loop { | |
//io::print(fmt!("%?", signal)); | |
//} | |
io::println("Bye"); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment