Skip to content

Instantly share code, notes, and snippets.

@o0Ignition0o
Created May 12, 2021 13:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save o0Ignition0o/bb857da3b4b3d8d62e7f66131b87518a to your computer and use it in GitHub Desktop.
Save o0Ignition0o/bb857da3b4b3d8d62e7f66131b87518a to your computer and use it in GitHub Desktop.
Let's spawn and kill stuff!
[package]
name = "tmp-phjhjr"
version = "0.1.0"
authors = ["o0Ignition0o <jeremy.lempereur@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
anyhow = "1.0.40"
rand = "0.8.3"
tracing = "0.1.25"
tracing-subscriber = "0.2.17"
bastion = { git = "https://github.com/bastion-rs/bastion.git" }
once_cell = "1.7.2"
use anyhow::{anyhow, Result as AnyResult};
use bastion::prelude::*;
use tracing::Level;
// simulate our servers
#[derive(Debug, Clone)]
struct Configuration {
pub maybe_blow_up_after: std::time::Duration,
pub distributor_name: String,
}
/// RUST_LOG=info cargo run
fn main() -> AnyResult<()> {
let subscriber = tracing_subscriber::fmt()
.with_max_level(Level::INFO)
.finish();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Sentinels group
let start_conf = get_conf();
// Initialize bastion
Bastion::init();
Bastion::start();
let _ = start_conf.into_iter().map(|server_conf| {
server_supervisor_from_conf(server_conf)
}).collect::<Vec<_>>();
// let s trigger some errors and see if ppl respawn
std::thread::sleep(std::time::Duration::from_secs(20));
Bastion::stop();
Bastion::block_until_stopped();
Ok(())
}
fn server_supervisor_from_conf(server_conf: Configuration) -> AnyResult<SupervisorRef> {
Bastion::supervisor(|supervisor| {
supervisor.children(|children| {
// Iniit 5 sentinels which will probe stuff
children
.with_redundancy(1)
.with_distributor(Distributor::named(server_conf.distributor_name.as_str()))
.with_exec(move |ctx| {
tracing::warn!("child started: {} / {}", server_conf.distributor_name.as_str(), ctx.current().id() );
let server_conf = server_conf.clone();
async move {
loop {
if let Ok(message) = ctx
.try_recv_timeout(server_conf.maybe_blow_up_after)
.await
{
tracing::info!("received a message.");
MessageHandler::new(message)
.on_fallback(|unknown, _sender_addr| {
tracing::error!(
"staff: uh oh, I received a message I didn't understand\n but it actually doesn't matter yet {:?}",
unknown
);
});
} else {
if rand::random::<bool>() {
tracing::error!("child stopping: {} / {}", server_conf.distributor_name.as_str(), ctx.current().id() );
panic!("oops I blew up!");
}
}
}
}
})
})
})
.map_err(|_| anyhow!("couldn't setup the server supervisor"))
}
fn get_conf() -> Vec<Configuration> {
vec![
Configuration {
maybe_blow_up_after: std::time::Duration::from_secs(2),
distributor_name: "server one".to_string(),
},
Configuration {
maybe_blow_up_after: std::time::Duration::from_secs(3),
distributor_name: "server two".to_string(),
},
Configuration {
maybe_blow_up_after: std::time::Duration::from_secs(5),
distributor_name: "server three".to_string(),
},
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment