Skip to content

Instantly share code, notes, and snippets.

@o0Ignition0o
Created April 9, 2021 08:04
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save o0Ignition0o/2bbb59bf2fdc85c0ee97783b3c2115c6 to your computer and use it in GitHub Desktop.
Save o0Ignition0o/2bbb59bf2fdc85c0ee97783b3c2115c6 to your computer and use it in GitHub Desktop.
Bastion http probe using reqwest
[package]
name = "bastion_probe_test"
version = "0.1.0"
authors = ["o0Ignition0o <jeremy.lempereur@gmail.com>"]
edition = "2018"
[dependencies]
bastion = { git = "https://github.com/bastion-rs/bastion.git", features = ["tokio-runtime"] }
tokio = { version = "1.4", features = ["time", "macros"] }
url = "2.2"
reqwest = "0.11"
anyhow = "1.0.40"
tracing = "0.1.25"
tracing-subscriber = "0.2.17"
use anyhow::{anyhow, Error as AnyError};
use bastion::prelude::*;
use reqwest::{StatusCode, Url};
use tracing::{info, warn};
#[derive(Debug)]
struct HttpProbe {
status_code: StatusCode,
body: String,
}
impl HttpProbe {
pub fn new(status_code: StatusCode, body: String) -> Self {
Self { status_code, body }
}
}
type ProbeRunStatus = Result<HttpProbe, AnyError>;
#[tokio::main]
async fn main() {
init_logger();
Bastion::init();
let probes = Distributor::named("probes");
// Probes group
Bastion::supervisor(|supervisor| {
supervisor.children(|children| {
// let's spawn 5 probes
children
.with_redundancy(5)
.with_distributor(probes)
.with_exec(probe_loop)
})
})
.expect("couldn't setup the probe group");
loop {
let url = Url::parse("http://ipinfo.io/ip").expect("couldn't parse url");
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Ask a question
let answer = probes
.ask_one(url.clone())
.expect("couldn't send message to probes");
// Handle a reply
MessageHandler::new(answer.await.expect("couldn't get an answer")).on_tell(
|probe_result: ProbeRunStatus, _| {
info!("probe is done: {:?}", probe_result);
},
);
}
// We would want to gracefully teardown our bastion before we stop.
// This wont run because
Bastion::stop();
Bastion::block_until_stopped();
}
async fn probe_loop(ctx: BastionContext) -> Result<(), ()> {
let http_client = reqwest::Client::new();
loop {
// reqwest client wraps an Arc and implements clone
let client = http_client.clone();
warn!("{} is ready to accept a new message!", ctx.current().id());
MessageHandler::new(ctx.recv().await?).on_question(|url: Url, sender| {
// let's spawn the work somewhere else, so we can be ready to accept a new message
spawn!(async move {
// Perform the http request
let http_request = client.get(url).send().await;
// check if the network call went well
let reply_to_send = match http_request {
Ok(response) => {
// todo: maybe do something about an expected status code in a probe context?
let status = response.status();
// check if the response body is available
match response.text().await {
Ok(body) => ProbeRunStatus::Ok(HttpProbe::new(status, body)),
Err(http_error) => ProbeRunStatus::Err(anyhow!(
"couldn't get response body! {:#?}",
http_error
)),
}
}
Err(reqwest_error) => ProbeRunStatus::Err(anyhow!(
"couldn't make http request! {:#?}",
reqwest_error
)),
};
sender.reply(reply_to_send).expect("couldn't send reply");
});
});
}
}
fn init_logger() {
let subscriber = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
}
@timClicks
Copy link

timClicks commented Apr 9, 2021

Thanks for this. Very useful :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment