Skip to content

Instantly share code, notes, and snippets.

@o0Ignition0o
Created May 12, 2021 09:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save o0Ignition0o/5a27a062d480a5c96aec9a5026495cce to your computer and use it in GitHub Desktop.
Save o0Ignition0o/5a27a062d480a5c96aec9a5026495cce to your computer and use it in GitHub Desktop.
A couple of probe runs and a configuration update
[package]
name = "tmp-phjhjr"
version = "0.1.0"
authors = ["o0Ignition0o"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.40"
rand = "0.8.3"
tracing = "0.1.25"
tracing-subscriber = "0.2.17"
bastion = { git = "https://github.com/bastion-rs/bastion.git" }
once_cell = "1.7.2"
use anyhow::{anyhow, Result as AnyResult};
use bastion::prelude::*;
use once_cell::sync::Lazy;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
use tracing::Level;
// this will be our state for now
static CONFIGURATION: Lazy<Mutex<Configuration>> = Lazy::new(|| {
Mutex::new(Configuration::new(vec![
"https://foo.bar".to_string(),
"https://baz.qux".to_string(),
]))
});
// Just for the sake of having something to do
#[derive(Debug)]
struct Configuration {
urls_to_probe: Vec<String>,
url_index: AtomicUsize,
}
// This enum represents the commands that can be passed to the sentinels
#[derive(Debug)]
enum ProbeCommand {
ProbeOne,
ProbeAll,
ReplaceUrls(Vec<String>),
}
impl Configuration {
pub fn new(urls: Vec<String>) -> Self {
Self {
urls_to_probe: urls,
url_index: AtomicUsize::new(0),
}
}
pub fn next(&self) -> String {
let index = self.url_index.fetch_add(1, Ordering::SeqCst);
self.urls_to_probe[index % self.urls_to_probe.len()].clone()
}
pub fn all(&self) -> Vec<String> {
self.urls_to_probe.clone()
}
pub fn update(&mut self, urls: Vec<String>) {
tracing::warn!("updating configuration: {}", urls.join(" | "));
self.urls_to_probe = urls;
self.url_index.store(0, Ordering::SeqCst)
}
}
/// RUST_LOG=info cargo run
fn main() -> AnyResult<()> {
let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Sentinels group
Bastion::supervisor(|supervisor| {
supervisor.children(|children| {
// Iniit 5 sentinels which will probe stuff
children
.with_redundancy(5)
.with_distributor(Distributor::named("sentinels"))
.with_exec(run_probes)
})
})
.map_err(|_| anyhow!("couldn't setup the bastion"))?;
// Initialize bastion
Bastion::init();
Bastion::start();
// wait until bastion is ready
std::thread::sleep(std::time::Duration::from_secs(1));
let sentinels = Distributor::named("sentinels");
// probe "https://foo.bar"
sentinels
.tell_one(ProbeCommand::ProbeOne)
.expect("couldn't send probe one message");
// probe "https://baz.qux"
sentinels
.tell_one(ProbeCommand::ProbeOne)
.expect("couldn't send probe one message");
// probe "https://foo.bar" and "https://baz.qux"
sentinels
.tell_one(ProbeCommand::ProbeAll)
.expect("couldn't send probe all message");
// wait until the previous probes are done
std::thread::sleep(std::time::Duration::from_secs(5));
sentinels
.tell_one(ProbeCommand::ReplaceUrls(vec![
"https://bastion.rs".to_string()
]))
.expect("couldn't update configuration");
// probe "https://bastion.rs"
sentinels
.tell_one(ProbeCommand::ProbeOne)
.expect("couldn't send probe all message");
// let's wait until all spawns are complete
std::thread::sleep(std::time::Duration::from_secs(5));
Bastion::stop();
Bastion::block_until_stopped();
Ok(())
}
async fn run_probes(ctx: BastionContext) -> Result<(), ()> {
loop {
if let Ok(message) = ctx
.try_recv_timeout(std::time::Duration::from_secs(2))
.await
{
tracing::info!("getting configuration");
let mut configuration = CONFIGURATION
.lock()
.expect("couldn't read configuration, something terrible happened :(");
MessageHandler::new(message)
.on_tell(|command: ProbeCommand, _| match command {
ProbeCommand::ProbeOne => {
let url = configuration.next();
spawn!(probe_url(url));
}
ProbeCommand::ProbeAll => {
let urls = configuration.all();
spawn!(probe_urls(urls));
}
ProbeCommand::ReplaceUrls(urls) => {
configuration.update(urls);
}
})
.on_fallback(|unknown, _sender_addr| {
tracing::error!(
"staff: uh oh, I received a message I didn't understand\n {:?}",
unknown
);
});
}
}
}
async fn probe_url(url: String) {
tracing::info!("probing one {}", url);
// wait up to 3 seconds to simulate a probe run
std::thread::sleep(std::time::Duration::from_secs(rand::random::<u64>() % 3));
}
async fn probe_urls(urls: Vec<String>) {
tracing::info!("probing all {}", urls.join(" | "));
// wait up to 3 seconds to simulate a probe run
std::thread::sleep(std::time::Duration::from_secs(rand::random::<u64>() % 3));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment