-
-
Save jamesmunns/c92458fd54af690079ac5becf4a49bc7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::{sync::Arc, time::Duration}; | |
use maitake_sync::WaitQueue; | |
use tokio::{select, time::{sleep, timeout}}; | |
#[tokio::main(flavor = "multi_thread")] | |
async fn main() { | |
let stopper = Stopper::new(); | |
let fut = async { | |
let a = tokio::task::spawn(sleep_for(stopper.clone(), Duration::from_secs(60))); | |
let b = tokio::task::spawn(sleep_for(stopper.clone(), Duration::from_secs(60))); | |
let c = tokio::task::spawn(sleep_for(stopper.clone(), Duration::from_secs(60))); | |
let d = tokio::task::spawn(run_for(stopper.clone(), Duration::from_secs(5))); | |
let _ = a.await; | |
let _ = b.await; | |
let _ = c.await; | |
let _ = d.await; | |
}; | |
match timeout(Duration::from_secs(15), fut).await { | |
Ok(_) => println!("COMPLETED"), | |
Err(_) => println!("TIMED OUT"), | |
} | |
} | |
async fn run_for(stopper: Stopper, dur: Duration) { | |
let normal = async { | |
println!("Running for {dur:?}..."); | |
sleep(dur).await; | |
panic!("Farewell!"); | |
}; | |
select! { | |
_ = normal => {}, | |
_ = stopper.wait_stopped() => { | |
println!("Run for stopped early from stopper"); | |
}, | |
} | |
} | |
async fn sleep_for(stopper: Stopper, dur: Duration) { | |
let normal = async { | |
println!("Running for {dur:?}..."); | |
sleep(dur).await; | |
println!("Done!"); | |
}; | |
select! { | |
_ = normal => {}, | |
_ = stopper.wait_stopped() => { | |
println!("Sleep for stopped early from stopper"); | |
}, | |
} | |
} | |
/// A basic cancellation-token | |
/// | |
/// Used to terminate (and signal termination of) worker tasks | |
/// | |
/// (from postcard-rpc) | |
#[derive(Clone)] | |
pub struct Stopper { | |
inner: Arc<WaitQueue>, | |
} | |
impl Stopper { | |
/// Create a new Stopper | |
pub fn new() -> Self { | |
Self { | |
inner: Arc::new(WaitQueue::new()), | |
} | |
} | |
/// Wait until the stopper has been stopped. | |
/// | |
/// Once this completes, the stopper has been permanently stopped | |
pub async fn wait_stopped(&self) { | |
// This completes if we are awoken OR if the queue is closed: either | |
// means we're cancelled | |
let _ = self.inner.wait().await; | |
} | |
/// Have we been stopped? | |
pub fn is_stopped(&self) -> bool { | |
self.inner.is_closed() | |
} | |
/// Stop the stopper | |
/// | |
/// All current and future calls to [Self::wait_stopped] will complete immediately | |
pub fn stop(&self) { | |
self.inner.close(); | |
} | |
} | |
impl Default for Stopper { | |
fn default() -> Self { | |
Self::new() | |
} | |
} | |
#[cfg(feature = "roll")] | |
impl Drop for Stopper { | |
fn drop(&mut self) { | |
if std::thread::panicking() { | |
self.stop(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment