Skip to content

Instantly share code, notes, and snippets.

@joerg-krause
Created March 1, 2017 14:50
Show Gist options
  • Save joerg-krause/57f1b25d07c5a9d41d408fd0e137c453 to your computer and use it in GitHub Desktop.
Save joerg-krause/57f1b25d07c5a9d41d408fd0e137c453 to your computer and use it in GitHub Desktop.
tokio_uds
extern crate futures;
extern crate tokio_core;
extern crate tokio_uds;
use std::io;
use std::fs;
use std::str;
use futures::{Async, Future, Poll, Stream};
use tokio_core::io::read_to_end;
use tokio_core::reactor::{Handle, Core};
use tokio_uds::{UnixListener, UnixStream};
struct MessageStream {
task: Box<Future<Item=(), Error=io::Error>>,
}
impl Stream for MessageStream {
type Item = Vec<u8>;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.task.poll()? {
Async::Ready(()) => unreachable!(),
Async::NotReady => (),
}
Ok(Async::NotReady)
}
}
fn handle_stream(handle: &Handle, stream: UnixStream) -> io::Result<()> {
/* Creates a future which will read all the bytes. */
let reader = read_to_end(stream, Vec::new())
.map(|(_, _buf)| {
let cmd = str::from_utf8(&_buf).unwrap();
println!("{:?}", cmd);
}).then(|_| { Ok(()) });
handle.spawn(reader);
Ok(())
}
fn incoming(handle: &Handle) -> io::Result<MessageStream> {
static PATH: &'static str = "/var/run/hello.sock";
let listener = match UnixListener::bind(PATH, &handle) {
Ok(m) => m,
Err(_) => {
fs::remove_file(PATH).unwrap();
UnixListener::bind(PATH, &handle).unwrap()
}
};
let handle_ = handle.clone();
/* Runs the incoming stream to completion, executing the provided closure for each element
* on the stream, producing a future.
*/
let task = Box::new(listener.incoming().for_each(move |(stream, _)| {
handle_stream(&handle_, stream)
}));
Ok(MessageStream { task: task })
}
struct Main {
handle: Handle,
message_stream: Option<MessageStream>,
}
impl Main {
fn new(handle: Handle) -> Main {
Main {
handle: handle.clone(),
message_stream: None,
}
}
fn listening(&mut self) {
self.message_stream = Some(incoming(&self.handle).unwrap());
}
}
impl Future for Main {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
loop {
if let Some(Async::Ready(Some(cmd))) = self.message_stream.as_mut().map(|d| d.poll().unwrap()) {
println!("cmd={:?}", cmd);
}
return Ok(Async::NotReady);
}
}
}
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let mut task = Main::new(handle);
task.listening();
core.run(task).unwrap()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment