Skip to content

Instantly share code, notes, and snippets.

@skade skade/queue.rs
Last active Aug 29, 2015

Embed
What would you like to do?
A simple queue with network access in Rust. Send any non-empty string to localhost:1234 to insert it, send "" to retrieve it.
use std::collections::dlist::DList;
use std::collections::Deque;
use std::io::net::tcp::{TcpListener,TcpStream};
use std::io::{BufferedStream,Acceptor,Listener,Reader,Stream};
enum Command {
Push(String),
Pop
}
#[deriving(Show)]
enum Response {
Accepted,
Data(Option<String>)
}
#[deriving(Send)]
struct Envelope<M: Send, R: Send> {
message: M,
reply_to: Sender<R>
}
fn main() {
let tcp_listener = TcpListener::bind("127.0.0.1", 1234).unwrap();
tcp_handler(tcp_listener, queue_task());
}
fn handle_stream<T: Stream>(
queue: Sender<Envelope<Command,Response>>,
mut buffer: BufferedStream<T>
) {
let command = read_command(&mut buffer);
let (sender, receiver) = channel::<Response>();
queue.send(Envelope { reply_to: sender, message: command });
let result = receiver.recv();
match write!(buffer, "{}", result) {
Err(e) => fail!("Error writing Response: {}", e),
_ => {}
}
match buffer.flush() {
Err(e) => fail!("Error flushing response buffer {}", e),
_ => {}
}
drop(buffer);
}
fn read_command<T: Stream>(buffer: &mut BufferedStream<T>) -> Command {
let mut input = buffer.read_line().unwrap();
input.pop_char();
match input.len() {
0 => Pop,
_ => Push(input)
}
}
fn tcp_handler(
listener: TcpListener,
queue: Sender<Envelope<Command,Response>>
) {
let mut acceptor = listener.listen();
for stream in acceptor.incoming() {
match stream {
Err(_) => fail!("connection error!"),
Ok(conn) => {
handler_task(&queue, conn);
}
}
}
}
fn queue_task() -> Sender<Envelope<Command,Response>> {
let (sender, receiver) = channel::<Envelope<Command,Response>>();
spawn(proc() {
let mut queue = DList::new();
for envelope in receiver.iter() {
let reply = match envelope.message {
Push(s) => { queue.push(s); Accepted },
Pop => { Data(queue.pop_front()) }
};
envelope.reply_to.send(reply);
}
});
sender
}
fn handler_task(queue: &Sender<Envelope<Command,Response>>, conn: TcpStream) {
let cloned_sender = queue.clone();
spawn(proc() {
handle_stream(
cloned_sender,
BufferedStream::new(conn)
);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.