Skip to content

Instantly share code, notes, and snippets.

@manuels
Created February 23, 2014 22:15
Show Gist options
  • Save manuels/9178144 to your computer and use it in GitHub Desktop.
Save manuels/9178144 to your computer and use it in GitHub Desktop.
extern crate sync;
extern crate collections;
use std::comm::Chan;
use std::mem::size_of_val;
use std::io::{ChanWriter,PortReader};
use collections::hashmap::HashMap;
use std::task::try;
use sync::RWArc;
use std::rand::random;
use std::io::timer::sleep;
struct b {
i: u8
}
impl b {
fn new() -> b { b{i: 1} }
}
struct MultiplexStream {
// arc: ~RWArc<HashMap<u32, ~Chan<~[u8]>>>,
arc: ~RWArc<HashMap<u32, b>>,
downstream_chan: Chan<~[u8]>
}
impl MultiplexStream {
fn new(downstream: (Port<~[u8]>, Chan<~[u8]>)) -> ~MultiplexStream {
let (downstream_port, downstream_chan) = downstream;
let mux = ~MultiplexStream {
arc: ~RWArc::new(HashMap::new()),
downstream_chan: downstream_chan
};
// begin
let arc = mux.arc.clone();
spawn(proc() {
try(proc() {
loop {
let mut reader = PortReader::new(downstream_port);
let port_num = reader.read_le_u32().unwrap();
let data = reader.read_to_end().unwrap();
arc.read(|open_ports| {
match open_ports.find(&port_num) {
Some(intermediate) => {
/* let success = intermediate.try_send(data.clone());
if !success {
arc.write(|open_ports| {
open_ports.remove(&port_num);
})
}*/
},
None => {}
};
});
}
return ();
});
// downstream was probably closed => cleanup
arc.write(|open_ports| {
//let iter = open_ports.move_iter();
//TODO: takeall
/*
for c in iter {
// TODO: does this really do what I expect it to do?
// Is this really neccessary? As soon as we 'take' the
// channel and we reach the next iteration, the channel
// should be closed.
c.drop()
}
*/
})
});
// end
return mux;
}
/*
fn is_port_open(self, port_num: u32) -> bool {
let arc = self.arc.clone();
do arc.read |open_ports| {
let res = open_ports.contains_key(portr_num);
return res;
}
}
*/
fn open(self, port_num: u32) -> Result<(Port<~b>, Chan<~[u8]>), ()> {
let arc = self.arc.clone();
// let (upstream_port, intermediate_chan): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
let (upstream_port, intermediate_chan): (Port<~b>, Chan<~b>) = Chan::new();
let (intermediate_port, upstream_chan): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
let upstream = (upstream_port, upstream_chan);
let port_is_already_open = arc.write(|open_ports| {
//let res = open_ports.find_or_insert(port_num, intermediate_chan);
//let port_is_already_open = (res == intermediate_chan);
if open_ports.contains_key(&port_num) {
return true;
}
else {
open_ports.insert(port_num, b::new());
return false;
}
});
if port_is_already_open {
return Err(());
};
spawn(proc() {
//do try {
loop {
let data = intermediate_port.recv();
let mut writer = ChanWriter::new(self.downstream_chan);
writer.write_le_u32(port_num);
writer.write(data);
writer.flush();
}
//}
// upstream was probably closed => cleanup
arc.write(|open_ports| {
open_ports.remove(&port_num);
})
});
return Ok(upstream);
}
}
#[test]
fn test_multichannel() {
let (base_port1, base_chan1): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
let (base_port2, base_chan2): (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
let mux1 = MultiplexStream::new((base_port1, base_chan2));
let mux2 = MultiplexStream::new((base_port2, base_chan1));
// a MultiplexStream is UDP-like: It is not guaranteed that the remote
// host really receives the packet
// ...so we must ensure that both sides began listening before sending
// any data!
let (port1, chan1) = mux1.open(1).unwrap();
let (port2, chan2) = mux2.open(1).unwrap();
spawn(proc() {
let msg = ~[1 as u8];
chan1.send(msg);
let buf1 = port1.recv();
//assert!(buf1[0] == 2)
});
spawn(proc() {
let msg = ~[2 as u8];
chan2.send(msg);
let buf2 = port2.recv();
//assert!(buf2[0] == 1)
});
}
/*
#[test]
fn test_multichannel_quick_check() {
let base_stream1: (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
let base_stream2: (Port<~[u8]>, Chan<~[u8]>) = Chan::new();
do spawn {
let (port1, _) = base_stream1;
let (_, chan2) = base_stream2;
let data = port1.recv();
chan2.send(data);
}
do spawn {
let (_, chan1) = base_stream1;
let (port2, _) = base_stream2;
let data = port2.recv();
chan1.send(data);
}
let mux1 = MultiplexStream::new(base_stream1);
let mux2 = MultiplexStream::new(base_stream2);
// a MultiplexStream is UDP-like: It is not guaranteed that the remote
// host really receives the packet
let mux1 = MultiplexStream::new(base_stream1);
let mux2 = MultiplexStream::new(base_stream2);
for mux in (mux1, mux2) {
do spawn {
let open_ports = [];
loop {
let action: u8 = random();
match action {
0 if !open_ports.is_empty() => {
// send random data over a random channel
// the port is at least locally open, maybe not on remote
let len = (random::<u32>() % 4096)+1;
let data = do range(len).iter().map { random::<u8>() };
let port = open_ports.ind_sample();
port.send(data);
}
1 => {
// try to open a random, non-open port
for retry in range(100) {
let num = random::<u32>();
if !open_ports.any(|port| port.num == num) {
open_ports.append_one(mux.open(num));
break;
}
}
}
2 if !open_ports.is_empty() => {
// close a random, open port
let port = open_ports.ind_sample();
open_ports.remove(&port);
port.close();
}
3 => {
// try to receive data from a random, open port
let port = open_ports.ind_sample();
let data = port.recv();
}
}
sleep(random::<u8>() % 100); // in millisec
}
}
}
}
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment