Skip to content

Instantly share code, notes, and snippets.

@jkilpatr
Created May 31, 2019 12:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jkilpatr/c348131160bee803cb1f0c5a93ccb6fd to your computer and use it in GitHub Desktop.
Save jkilpatr/c348131160bee803cb1f0c5a93ccb6fd to your computer and use it in GitHub Desktop.
/// Opens a tcpstream to the babel management socket using a standard timeout
/// for both the open and read operations
pub fn open_babel_stream(babel_port: u16) -> ConnectFuture {
let socket_string = format!("[::1]:{}", babel_port);
trace!("About to open Babel socket using {}", socket_string);
let socket: SocketAddr = socket_string.parse().unwrap();
TcpStream::connect(&socket)
}
fn read_babel(
stream: TcpStream,
depth: usize,
) -> Box<Future<Item = (TcpStream, String), Error = Error>> {
let buffer = Vec::new();
Box::new(read(stream, buffer).then(move |result| {
if result.is_err() {
return Box::new(future::err(TokioError(format!("{:?}", result)).into()))
as Box<Future<Item = (TcpStream, String), Error = Error>>;
}
let (stream, buffer, bytes) = result.unwrap();
// we got nothing, try again for up to 3s
let when = Instant::now() + Duration::from_secs(10);
if bytes == 0 && depth <= 500 {
trace!("Nothing on the stream yet, trying again");
return Box::new(
Delay::new(when)
.map_err(move |e| panic!("timer failed; err={:?}", e))
.and_then(move |_| read_babel(stream, depth + 1)),
) as Box<Future<Item = (TcpStream, String), Error = Error>>;
}
let output = String::from_utf8(buffer);
if let Err(e) = output {
return Box::new(future::err(ReadFailed(format!("{:?}", e)).into()))
as Box<Future<Item = (TcpStream, String), Error = Error>>;
}
let output = output.unwrap();
trace!(
"Babel monitor got {} bytes after {} tries with the message {}",
bytes,
depth,
output
);
let babel_data = read_babel_sync(&output);
if let Err(e) = babel_data {
warn!("Babel read failed! {} {:?}", output, e);
return Box::new(future::err(ReadFailed(format!("{:?}", e)).into()))
as Box<Future<Item = (TcpStream, String), Error = Error>>;
}
let babel_data = babel_data.unwrap();
Box::new(future::ok((stream, babel_data)))
as Box<Future<Item = (TcpStream, String), Error = Error>>
}))
}
fn read_babel_sync(output: &str) -> Result<String, Error> {
let mut ret = String::new();
for line in output.lines() {
ret.push_str(line);
ret.push_str("\n");
match line.trim() {
"ok" => {
trace!(
"Babel returned ok; full output:\n{}\nEND OF BABEL OUTPUT",
ret
);
return Ok(ret);
}
"bad" | "no" => {
warn!(
"Babel returned bad/no; full output:\n{}\nEND OF BABEL OUTPUT",
ret
);
return Err(ReadFailed(ret).into());
}
_ => continue,
}
}
warn!(
"Terminator was never found; full output:\n{:?}\nEND OF BABEL OUTPUT",
ret
);
return Err(NoTerminator(ret).into());
}
pub fn run_command(
stream: TcpStream,
cmd: &str,
) -> Box<Future<Item = (TcpStream, String), Error = Error>> {
trace!("Running babel command {}", cmd);
let mut bytes = Vec::new();
bytes.clone_from_slice(&cmd.as_bytes());
let cmd = cmd.to_string();
Box::new(write_all(stream, bytes).then(move |out| {
if out.is_err() {
return Box::new(Either::A(future_result(Err(CommandFailed(
cmd,
format!("{:?}", out),
)
.into()))));
}
let (stream, _res) = out.unwrap();
Box::new(Either::B(read_babel(stream, 0)))
}))
}
// Consumes the automated Preamble and validates configuration api version
pub fn start_connection(stream: TcpStream) -> Box<Future<Item = TcpStream, Error = Error>> {
trace!("Starting babel connection");
Box::new(read_babel(stream, 0).then(|result| {
if let Err(e) = result {
return Err(e);
}
let (stream, preamble) = result.unwrap();
validate_preamble(preamble)?;
Ok(stream)
}))
}
fn validate_preamble(preamble: String) -> Result<(), Error> {
// Note you have changed the config interface, bump to 1.1 in babel
if preamble.contains("ALTHEA 0.1") {
trace!("Attached OK to Babel with preamble: {}", preamble);
return Ok(());
} else {
return Err(InvalidPreamble(preamble).into());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment