Skip to content

Instantly share code, notes, and snippets.

@14427
Created January 26, 2013 19:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 14427/4644206 to your computer and use it in GitHub Desktop.
Save 14427/4644206 to your computer and use it in GitHub Desktop.
Signal merge test
use either::*;
use pipes::*;
use task::spawn;
pub trait Clone {
fn clone(&self) -> self;
}
impl<T: Copy> T: Clone {
fn clone(&self) -> T {
copy *self
}
}
pub struct Signal<T: Clone Owned> {
priv update: Chan< Chan<T> >,
}
impl<T: Clone Owned> Signal<T> {
static fn new(ch: Chan<Chan<T>>) -> Signal<T> {
Signal { update: ch }
}
fn add_chan(&self, ch: Chan<T>) {
error!("Adding channel");
self.update.send(ch);
error!("Added channel");
}
fn lift<U: Clone Owned>(&self, f: ~fn(T) -> U) -> Signal<U> {
lift(self, f)
}
}
#[inline(always)]
pub fn signal_loop<T: Clone Owned, U: Clone Owned>(
default: U,
update: Port<T>,
new_client: Port<Chan<U>>,
process: ~fn(T, U) -> U,
filter: ~fn(&T) -> bool)
{
error!("Signal loop");
do spawn {
error!("In loop task");
let mut chans: ~[Chan<U>] = ~[];
let mut value = default.clone();
let mut update_open = true;
let mut client_open = true;
loop {
error!("In loop");
if client_open && update_open {
match select2i(&update, &new_client) {
Left(()) => {
let opt_tmp = update.try_recv();
match opt_tmp {
Some(tmp) => {
if filter(&tmp) {
value = process(tmp, value);
for chans.each |c| {
c.send( value.clone() );
}
}
},
None => update_open = false,
}
},
Right(()) => {
let opt_ch: Option<Chan<U>> = new_client.try_recv();
match opt_ch {
Some(ch) => {
ch.send( value.clone() );
chans.push(ch);
},
None => client_open = false,
}
},
}
} else if update_open {
let opt_tmp = update.try_recv();
match opt_tmp {
Some(tmp) => {
if filter(&tmp) {
value = process(tmp, value);
for chans.each |c| {
c.send( value.clone() );
}
}
},
None => update_open = false,
}
} else if client_open {
let opt_ch: Option<Chan<U>> = new_client.try_recv();
match opt_ch {
Some(ch) => {
ch.send( value.clone() );
chans.push(ch);
},
None => client_open = false,
}
} else {
break
}
}
io::println("Exit Signal loop");
}
}
#[inline(always)]
pub fn lift<T: Clone Owned, U: Clone Owned>(signal: &Signal<T>, f: ~fn(T) -> U) -> Signal<U> {
error!("Lifting...");
let (update, chan) = pipes::stream();
let (client_port, client_chan) = pipes::stream();
error!("hello?");
signal.add_chan(chan);
error!("debug 2");
let initial = match update.try_recv() {
Some(value) => f( value ),
None => fail ~"Signal must have initial value",
};
error!("debug 3");
signal_loop(initial, update, client_port, |x, _| f(x), |_| true);
error!("Lifted");
Signal::new(client_chan)
}
#[inline(always)]
pub fn constant<T: Clone Owned>(value: T) -> Signal<T> {
let (port, chan) = pipes::stream();
do spawn {
loop {
let client: Option<Chan<T>> = port.try_recv();
match client {
Some(ch) => ch.send( value.clone() ),
None => break,
}
}
io::println("Exiting constant");
}
Signal::new(chan)
}
/*#[doc(hidden)]
extern mod rustrt {
#[rust_stack]
unsafe fn rust_get_task() -> *rust_task;
}*/
#[inline(always)]
pub fn merge2<T: Clone Owned, U: Clone Owned>(one: &Signal<T>, two: &Signal<U>) -> Signal<(T, U)> {
let (port, chan) = pipes::stream();
let (update1, client1) = pipes::stream();
let (update2, client2) = pipes::stream();
one.add_chan(client1);
two.add_chan(client2);
do spawn {
let mut chans: ~[Chan<(T, U)>] = ~[];
let mut last1 = update1.recv();
let mut last2 = update2.recv();
let mut push: bool;
let mut u1_open = true;
let mut u2_open = true;
let mut client_open = true;
let header1 = PacketHeader();
let header2 = PacketHeader();
let header3 = PacketHeader();
let mut ports = ~[update1.header(), update2.header(), port.header()];
while u1_open || u2_open || client_open {
push = false;
error!("Loop: %?, %?, %?", u1_open, u2_open, client_open);
match selecti( ports ) {
0 => {
error!("Left");
match update1.try_recv() {
Some(value) => {
last1 = value;
push = true;
}
None => {
error!("Left sender is closed");
u1_open = false;
ports[0] = &header1;
}
}
}
1 => {
error!("Right");
match update2.try_recv() {
Some(value) => {
last2 = value;
push = true;
}
None => {
error!("Right sender is closed");
u2_open = false;
ports[1] = &header2;
}
}
}
2 => {
error!("New Client");
match port.try_recv() {
Some(ch) => {
let ch: Chan<(T, U)> = ch;
let value = (last1.clone(), last2.clone());
ch.send( value );
chans.push( ch );
}
None => {
error!("Controller is closed");
client_open = false;
ports[2] = &header3;
}
}
}
_ => fail ~"wut?",
}
error!("Pushing?: %?", push);
if push {
for chans.each |ch| {
let value = (last1.clone(), last2.clone());
if !ch.try_send( value ) {
fail ~"This need to get fixed";
}
}
}
}
error!("Exiting merge");
}
Signal::new(chan)
}
fn main() {
let s1 = &constant(5);
let s2 = &constant(3);
let signal = merge2(s1, s2);
signal.lift(|(a, b)| error!("Lifting: %?", a+b ));
//loop {
//io::print(fmt!("%?", signal));
//}
io::println("Bye");
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment