Skip to content

Instantly share code, notes, and snippets.

@jamesmunns
Created November 6, 2024 19:15
Show Gist options
  • Save jamesmunns/c92458fd54af690079ac5becf4a49bc7 to your computer and use it in GitHub Desktop.
Save jamesmunns/c92458fd54af690079ac5becf4a49bc7 to your computer and use it in GitHub Desktop.
use std::{sync::Arc, time::Duration};
use maitake_sync::WaitQueue;
use tokio::{select, time::{sleep, timeout}};
#[tokio::main(flavor = "multi_thread")]
async fn main() {
let stopper = Stopper::new();
let fut = async {
let a = tokio::task::spawn(sleep_for(stopper.clone(), Duration::from_secs(60)));
let b = tokio::task::spawn(sleep_for(stopper.clone(), Duration::from_secs(60)));
let c = tokio::task::spawn(sleep_for(stopper.clone(), Duration::from_secs(60)));
let d = tokio::task::spawn(run_for(stopper.clone(), Duration::from_secs(5)));
let _ = a.await;
let _ = b.await;
let _ = c.await;
let _ = d.await;
};
match timeout(Duration::from_secs(15), fut).await {
Ok(_) => println!("COMPLETED"),
Err(_) => println!("TIMED OUT"),
}
}
async fn run_for(stopper: Stopper, dur: Duration) {
let normal = async {
println!("Running for {dur:?}...");
sleep(dur).await;
panic!("Farewell!");
};
select! {
_ = normal => {},
_ = stopper.wait_stopped() => {
println!("Run for stopped early from stopper");
},
}
}
async fn sleep_for(stopper: Stopper, dur: Duration) {
let normal = async {
println!("Running for {dur:?}...");
sleep(dur).await;
println!("Done!");
};
select! {
_ = normal => {},
_ = stopper.wait_stopped() => {
println!("Sleep for stopped early from stopper");
},
}
}
/// A basic cancellation-token
///
/// Used to terminate (and signal termination of) worker tasks
///
/// (from postcard-rpc)
#[derive(Clone)]
pub struct Stopper {
inner: Arc<WaitQueue>,
}
impl Stopper {
/// Create a new Stopper
pub fn new() -> Self {
Self {
inner: Arc::new(WaitQueue::new()),
}
}
/// Wait until the stopper has been stopped.
///
/// Once this completes, the stopper has been permanently stopped
pub async fn wait_stopped(&self) {
// This completes if we are awoken OR if the queue is closed: either
// means we're cancelled
let _ = self.inner.wait().await;
}
/// Have we been stopped?
pub fn is_stopped(&self) -> bool {
self.inner.is_closed()
}
/// Stop the stopper
///
/// All current and future calls to [Self::wait_stopped] will complete immediately
pub fn stop(&self) {
self.inner.close();
}
}
impl Default for Stopper {
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "roll")]
impl Drop for Stopper {
fn drop(&mut self) {
if std::thread::panicking() {
self.stop();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment