これは Rust of Us - Chapter 3 で使う資料です。 TRPL の cocurrency をベースにまとめています。 サンプルコードは、そのまま利用しています。
Rust の Concurrency で大事になる Trait が 2 つある。
- Sync
- Send
このトレイトを実装すると、コンパイラに複数のスレッド間でデータを共有しても問題無いことを教えます。 Sync を実装した例だと Arc という、アトミックなリファレンスカウントを使ってイミュータブルなデータをスレッド間で安全に共有することができる型がある。
このトレイトを実装すると、スレッド間で安全にオーナーシップを受け渡すことができる。 チャンネルを使ってデータのやりとりをするときにも、このトレイトが実装されている必要がある。
例えば、FFI のラッピングのようなスレッドセーフでないデータをには、Send を実装すべきでない。 こうすることでコンパイラが、スレッドからデータを移動すべきでないとエラーをだしてくれる。
thread:spawn を使うとスレッドを起動することができる。Ruby と同じで、渡したクロージャが別スレッドとして実行される。
use std::thread; fn main() { thread::spawn(|| { println!("Hello from a thread!"); }); }
spawn はハンドラーを返し、スレッドを join することでスレッドから値を返すことができる。
use std::thread; fn main() { let handle = thread::spawn(|| { "Hello from a thread!" }); println!("{}", handle.join().unwrap()); }
スレッド間でデータを共有するとき、スレッドセーフでないデータはコンパイル時にエラーとなる。
use std::thread; fn main() { let mut data = vec![1, 2, 3]; for i in 0..3 { thread::spawn(move || { data[i] += 1; }); } thread::sleep_ms(50); } // コンパイルエラー // // 8:17 error: capture of moved value: `data` // data[i] += 1; // ^~~~
スレッド間でデータを共有するには、前述の 2 つのトレイトを使う必要がある。
まず、Sync を利用するために Arc を使う。Acr は、標準の AtomicReferenceCount を提供する型で、複数のリファレンス間で同じデータのオーナーシップを 共有を可能にする。Arc は、リファレンスの数をカウントしてくれる。Arc のカウントは、コンパイラがアトミックに処理するようにしてくれる。
Atomic なため複数スレッド間でも安全にアクセスできる。これは、データ競合が発生しない不可分の内部カウントをコンパイラが提供してくれる。 Arc::clone() を呼びだすと内部カウントが増える。
use std::sync::Arc; let mut data = Arc::new(vec![1, 2, 3]); let data = data.clone();
ただし、Arc はラップするデータが Sync を実装していることを期待している。 そのため、このコードはエラーとなる。
:11:24 error: cannot borrow immutable borrowed content as mutable :11 data[i] += 1; ^~~~
スレッド間で共有されるデータは、イミュータブルであることが望ましいがミュータブルなデータを扱いたい。 こんなときは、Mutex を使う。Mutext を使うと、そのタイミングで一つのスレッドだけが値を変更することを可能にします。
Mutext のみをスレッドで使うと、エラーが発生します。そのため、Arc でラップする必要がある。
error: the trait `core::marker::Send` is not implemented for the type `std::sync::mutex::MutexGuard<'_, collections::vec::Vec>` [E0277] thread::spawn(move || { ^~~~~~~~~~~~~ note: `std::sync::mutex::MutexGuard<'_, collections::vec::Vec>` cannot be sent between threads safely thread::spawn(move || { ^~~~~~~~~~~~~
Mutex の値を使うときは、Mutex::lock() を使う。lock() を使うと LockResult<MutexGuard> が返ってくる (MutexGuardが Send を実装していないため、Arc 無しでスレッドで使えない)。 ロックを取得すると、free されるまでロックを保持し続ける。 この方法では、親スレッドが子スレッドの情報を得るには、スレッドが終了するまで少し待つ必要がある。
use std::sync::Mutex; fn main() { let data = Arc::new(Mutex::new(vec![1, 2, 3])); for i in 0..3 { let data = data.clone(); thread::spawn(move || { let mut data = data.lock().unwrap(); data[i] += 1; }); } thread::sleep_ms(50); }
lock() を実行したときに、他のスレッドがロックを獲得していると Result が Error となるので、それでエラーハンドリングをする。
スレッド間でデータをやりとりする別のやり型として、channels がある。この方法だと、待たなくてよい。
use std::sync::{Arc, Mutex}; use std::thread; use std::sync::mpsc; fn main() { let data = Arc::new(Mutex::new(0)); let (tx, rx) = mpsc::channel(); for _ in 0..10 { let (data, tx) = (data.clone(), tx.clone()); thread::spawn(move || { let mut data = data.lock().unwrap(); *data += 1; tx.send(()); }); } for _ in 0..10 { rx.recv(); } }
channel は Send を持っているデータを送ることができる。channel はデータを送るとシグナルも送信する。そうすることで、channel からデータを受けとることができる。
use std::thread; use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); for _ in 0..10 { let tx = tx.clone(); thread::spawn(move || { let answer = 42; tx.send(answer); }); } rx.recv().ok().expect("Could not receive answer"); }
panic! を使うと、実行しているスレッドをクラッシュすることができる。 パニックしたスレッドは Result を返すため、親スレッドは子スレッドがパニックしたかどうかを確認することができる。
A panic! will crash the currently executing thread. You can use Rust's threads as a simple isolation mechanism:
use std::thread; let result = thread::spawn(move || { panic!("oops!"); }).join(); assert!(result.is_err());
クロージャの前に付いている move は、所有権をクロージャに移すためのもの。