Skip to content

Instantly share code, notes, and snippets.

@gsquire
Created May 5, 2017 06:32
Show Gist options
  • Save gsquire/cb0bb6c0c7d48b57268c1254e98cbf39 to your computer and use it in GitHub Desktop.
Save gsquire/cb0bb6c0c7d48b57268c1254e98cbf39 to your computer and use it in GitHub Desktop.
extern crate byteorder;
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
use std::io::{self, Write};
use std::net::TcpStream;
use byteorder::{BigEndian, ByteOrder};
use futures::{Future, Stream};
use tokio_core::net::TcpStream as TokioTcp;
use tokio_core::reactor::Core;
use tokio_io::AsyncRead;
use tokio_io::codec::length_delimited;
use tokio_io::io::write_all;
fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let tcp = TcpStream::connect("127.0.0.1:4150").unwrap();
let tokio_tcp = TokioTcp::from_stream(tcp, &handle).unwrap();
let (reader, writer) = tokio_tcp.split();
// Send NSQ the setup information.
let prelude = write_all(writer, " V2".as_bytes()).and_then(|(stream, _)| {
write_all(stream, "SUB test chan\n".as_bytes())
}).and_then(|(stream, _)| {
write_all(stream, "RDY 10\n".as_bytes())
});
let framed_read = length_delimited::Builder::new().length_field_length(4).new_read(reader);
let reader = framed_read.for_each(|mut buf| {
let frame_type = BigEndian::read_i32(buf.as_ref());
println!("frame type is {}", frame_type);
if frame_type == 2 {
// Ditch the frame type.
buf.split_to(4);
let time_bytes = buf.split_to(8);
let time = BigEndian::read_i64(time_bytes.as_ref());
println!("time is {}", time);
let attempt_bytes = buf.split_to(2);
let attempts = BigEndian::read_u16(attempt_bytes.as_ref());
println!("attempts is {}", attempts);
let id_bytes = buf.split_to(16);
let id = ::std::str::from_utf8(id_bytes.as_ref()).unwrap();
println!("id is {}", id);
// TODO: Figure out how to share write half.
/*let finished = format!("FIN {}\n", id);
let fin_fut = prelude.and_then(move |(w, _)| {
write_all(w, finished.as_bytes())
}).map(|(_, _)| {
println!("mapping");
}).map_err(|_| {
println!("error map");
});
handle.spawn(fin_fut);*/
}
io::stdout().write_all(buf.as_ref()).unwrap();
print!("\n");
Ok(())
});
core.run(prelude.join(reader)).unwrap();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment