Created
May 12, 2021 09:46
-
-
Save o0Ignition0o/5a27a062d480a5c96aec9a5026495cce to your computer and use it in GitHub Desktop.
A couple of probe runs and a configuration update
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[package] | |
name = "tmp-phjhjr" | |
version = "0.1.0" | |
authors = ["o0Ignition0o"] | |
edition = "2018" | |
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html | |
[dependencies] | |
anyhow = "1.0.40" | |
rand = "0.8.3" | |
tracing = "0.1.25" | |
tracing-subscriber = "0.2.17" | |
bastion = { git = "https://github.com/bastion-rs/bastion.git" } | |
once_cell = "1.7.2" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use anyhow::{anyhow, Result as AnyResult}; | |
use bastion::prelude::*; | |
use once_cell::sync::Lazy; | |
use std::sync::atomic::AtomicUsize; | |
use std::sync::atomic::Ordering; | |
use std::sync::Mutex; | |
use tracing::Level; | |
// this will be our state for now | |
static CONFIGURATION: Lazy<Mutex<Configuration>> = Lazy::new(|| { | |
Mutex::new(Configuration::new(vec![ | |
"https://foo.bar".to_string(), | |
"https://baz.qux".to_string(), | |
])) | |
}); | |
// Just for the sake of having something to do | |
#[derive(Debug)] | |
struct Configuration { | |
urls_to_probe: Vec<String>, | |
url_index: AtomicUsize, | |
} | |
// This enum represents the commands that can be passed to the sentinels | |
#[derive(Debug)] | |
enum ProbeCommand { | |
ProbeOne, | |
ProbeAll, | |
ReplaceUrls(Vec<String>), | |
} | |
impl Configuration { | |
pub fn new(urls: Vec<String>) -> Self { | |
Self { | |
urls_to_probe: urls, | |
url_index: AtomicUsize::new(0), | |
} | |
} | |
pub fn next(&self) -> String { | |
let index = self.url_index.fetch_add(1, Ordering::SeqCst); | |
self.urls_to_probe[index % self.urls_to_probe.len()].clone() | |
} | |
pub fn all(&self) -> Vec<String> { | |
self.urls_to_probe.clone() | |
} | |
pub fn update(&mut self, urls: Vec<String>) { | |
tracing::warn!("updating configuration: {}", urls.join(" | ")); | |
self.urls_to_probe = urls; | |
self.url_index.store(0, Ordering::SeqCst) | |
} | |
} | |
/// RUST_LOG=info cargo run | |
fn main() -> AnyResult<()> { | |
let subscriber = tracing_subscriber::fmt() | |
.with_max_level(Level::INFO) | |
.finish(); | |
tracing::subscriber::set_global_default(subscriber).unwrap(); | |
// Sentinels group | |
Bastion::supervisor(|supervisor| { | |
supervisor.children(|children| { | |
// Iniit 5 sentinels which will probe stuff | |
children | |
.with_redundancy(5) | |
.with_distributor(Distributor::named("sentinels")) | |
.with_exec(run_probes) | |
}) | |
}) | |
.map_err(|_| anyhow!("couldn't setup the bastion"))?; | |
// Initialize bastion | |
Bastion::init(); | |
Bastion::start(); | |
// wait until bastion is ready | |
std::thread::sleep(std::time::Duration::from_secs(1)); | |
let sentinels = Distributor::named("sentinels"); | |
// probe "https://foo.bar" | |
sentinels | |
.tell_one(ProbeCommand::ProbeOne) | |
.expect("couldn't send probe one message"); | |
// probe "https://baz.qux" | |
sentinels | |
.tell_one(ProbeCommand::ProbeOne) | |
.expect("couldn't send probe one message"); | |
// probe "https://foo.bar" and "https://baz.qux" | |
sentinels | |
.tell_one(ProbeCommand::ProbeAll) | |
.expect("couldn't send probe all message"); | |
// wait until the previous probes are done | |
std::thread::sleep(std::time::Duration::from_secs(5)); | |
sentinels | |
.tell_one(ProbeCommand::ReplaceUrls(vec![ | |
"https://bastion.rs".to_string() | |
])) | |
.expect("couldn't update configuration"); | |
// probe "https://bastion.rs" | |
sentinels | |
.tell_one(ProbeCommand::ProbeOne) | |
.expect("couldn't send probe all message"); | |
// let's wait until all spawns are complete | |
std::thread::sleep(std::time::Duration::from_secs(5)); | |
Bastion::stop(); | |
Bastion::block_until_stopped(); | |
Ok(()) | |
} | |
async fn run_probes(ctx: BastionContext) -> Result<(), ()> { | |
loop { | |
if let Ok(message) = ctx | |
.try_recv_timeout(std::time::Duration::from_secs(2)) | |
.await | |
{ | |
tracing::info!("getting configuration"); | |
let mut configuration = CONFIGURATION | |
.lock() | |
.expect("couldn't read configuration, something terrible happened :("); | |
MessageHandler::new(message) | |
.on_tell(|command: ProbeCommand, _| match command { | |
ProbeCommand::ProbeOne => { | |
let url = configuration.next(); | |
spawn!(probe_url(url)); | |
} | |
ProbeCommand::ProbeAll => { | |
let urls = configuration.all(); | |
spawn!(probe_urls(urls)); | |
} | |
ProbeCommand::ReplaceUrls(urls) => { | |
configuration.update(urls); | |
} | |
}) | |
.on_fallback(|unknown, _sender_addr| { | |
tracing::error!( | |
"staff: uh oh, I received a message I didn't understand\n {:?}", | |
unknown | |
); | |
}); | |
} | |
} | |
} | |
async fn probe_url(url: String) { | |
tracing::info!("probing one {}", url); | |
// wait up to 3 seconds to simulate a probe run | |
std::thread::sleep(std::time::Duration::from_secs(rand::random::<u64>() % 3)); | |
} | |
async fn probe_urls(urls: Vec<String>) { | |
tracing::info!("probing all {}", urls.join(" | ")); | |
// wait up to 3 seconds to simulate a probe run | |
std::thread::sleep(std::time::Duration::from_secs(rand::random::<u64>() % 3)); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment