Last active
August 29, 2015 14:05
-
-
Save skade/7c9458634ebfef0ed202 to your computer and use it in GitHub Desktop.
A simple queue with network access in Rust. Send any non-empty string to localhost:1234 to insert it, send "" to retrieve it.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::collections::dlist::DList; | |
use std::collections::Deque; | |
use std::io::net::tcp::{TcpListener,TcpStream}; | |
use std::io::{BufferedStream,Acceptor,Listener,Reader,Stream}; | |
enum Command { | |
Push(String), | |
Pop | |
} | |
#[deriving(Show)] | |
enum Response { | |
Accepted, | |
Data(Option<String>) | |
} | |
#[deriving(Send)] | |
struct Envelope<M: Send, R: Send> { | |
message: M, | |
reply_to: Sender<R> | |
} | |
fn main() { | |
let tcp_listener = TcpListener::bind("127.0.0.1", 1234).unwrap(); | |
tcp_handler(tcp_listener, queue_task()); | |
} | |
fn handle_stream<T: Stream>( | |
queue: Sender<Envelope<Command,Response>>, | |
mut buffer: BufferedStream<T> | |
) { | |
let command = read_command(&mut buffer); | |
let (sender, receiver) = channel::<Response>(); | |
queue.send(Envelope { reply_to: sender, message: command }); | |
let result = receiver.recv(); | |
match write!(buffer, "{}", result) { | |
Err(e) => fail!("Error writing Response: {}", e), | |
_ => {} | |
} | |
match buffer.flush() { | |
Err(e) => fail!("Error flushing response buffer {}", e), | |
_ => {} | |
} | |
drop(buffer); | |
} | |
fn read_command<T: Stream>(buffer: &mut BufferedStream<T>) -> Command { | |
let mut input = buffer.read_line().unwrap(); | |
input.pop_char(); | |
match input.len() { | |
0 => Pop, | |
_ => Push(input) | |
} | |
} | |
fn tcp_handler( | |
listener: TcpListener, | |
queue: Sender<Envelope<Command,Response>> | |
) { | |
let mut acceptor = listener.listen(); | |
for stream in acceptor.incoming() { | |
match stream { | |
Err(_) => fail!("connection error!"), | |
Ok(conn) => { | |
handler_task(&queue, conn); | |
} | |
} | |
} | |
} | |
fn queue_task() -> Sender<Envelope<Command,Response>> { | |
let (sender, receiver) = channel::<Envelope<Command,Response>>(); | |
spawn(proc() { | |
let mut queue = DList::new(); | |
for envelope in receiver.iter() { | |
let reply = match envelope.message { | |
Push(s) => { queue.push(s); Accepted }, | |
Pop => { Data(queue.pop_front()) } | |
}; | |
envelope.reply_to.send(reply); | |
} | |
}); | |
sender | |
} | |
fn handler_task(queue: &Sender<Envelope<Command,Response>>, conn: TcpStream) { | |
let cloned_sender = queue.clone(); | |
spawn(proc() { | |
handle_stream( | |
cloned_sender, | |
BufferedStream::new(conn) | |
); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment