-
-
Save rust-play/5fe6b1371e9acc6e7b573b2709f5a1c4 to your computer and use it in GitHub Desktop.
Code shared from the Rust Playground
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //! Reproducer: tokio::sync::Mutex deadlock with a single, unlocked mutex. | |
| //! | |
| //! Cargo.toml: | |
| //! [dependencies] | |
| //! tokio = { version = "1", features = ["full"] } | |
| use std::future::Future; | |
| use std::pin::Pin; | |
| use std::sync::Arc; | |
| use std::sync::atomic::{AtomicBool, Ordering}; | |
| use std::task::{Context, Poll}; | |
| use std::time::Duration; | |
| use tokio::sync::Mutex; | |
| /// Future wrapper that stops forwarding polls when `stopped` is set. | |
| struct PausableFuture<F> { | |
| inner: Option<Pin<Box<F>>>, | |
| stopped: Arc<AtomicBool>, | |
| } | |
| impl<F: Future> Future for PausableFuture<F> { | |
| type Output = F::Output; | |
| fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | |
| if self.stopped.load(Ordering::Acquire) { | |
| let _ = self.inner.take(); | |
| return Poll::Pending; | |
| } | |
| if let Some(inner) = &mut self.inner { | |
| return inner.as_mut().poll(cx); | |
| } | |
| panic!("Cannot unpause a PausableFuture") | |
| } | |
| } | |
| #[tokio::main(flavor = "current_thread")] | |
| async fn main() { | |
| let mutex = Arc::new(Mutex::new(())); | |
| // Hold the lock so workers queue up. | |
| let guard = mutex.lock().await; | |
| println!("main acquired the lock"); | |
| // Spawn 4 workers, each wrapped in a PausableFuture. | |
| // Each worker gets its own flag - whether to stop or not. | |
| let stop_flags: Vec<_> = (0..4).map(|_| Arc::new(AtomicBool::new(false))).collect(); | |
| let mut handles = Vec::new(); | |
| for id in 0..4u32 { | |
| let mutex = Arc::clone(&mutex); | |
| let stopped = Arc::clone(&stop_flags[id as usize]); | |
| handles.push(tokio::spawn(PausableFuture { | |
| stopped, | |
| inner: Some(Box::pin(async move { | |
| let guard = mutex.lock().await; | |
| println!("worker {id}: acquired lock"); | |
| tokio::time::sleep(Duration::from_millis(100)).await; | |
| drop(guard); | |
| println!("worker {id}: released lock"); | |
| })), | |
| })); | |
| } | |
| // Yield so all workers poll lock() and start waiting. | |
| tokio::task::yield_now().await; | |
| // Stop polling worker 1. | |
| stop_flags[1].store(true, Ordering::Release); | |
| // Release the lock. The mutex is now UNLOCKED. | |
| drop(guard); | |
| println!("main released the lock"); | |
| for (id, h) in handles.into_iter().enumerate() { | |
| match tokio::time::timeout(std::time::Duration::from_secs(2), h).await { | |
| Ok(Ok(())) => println!("worker {id}: done"), | |
| Ok(Err(e)) => println!("worker {id}: error: {e}"), | |
| Err(_) => println!("worker {id}: DEADLOCKED"), | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment