- 後続の章では様々な形式の__プール__と呼ばれるオブジェクト群を見ていく
- プールは__セット__に似ているが、主に以下の二点が異なる
contains()
を提供しない- 同じアイテムが重複して出現することが許されている
プールのインターフェース(Fig. 10.1):
public interface Pool<T> {
void set(T item);
T get();
}
- プールは並列システムでは良く見かける
- 生産者/消費者パターン
- 生産者の方がペースが早い(bursty)場合に備えてバッファが必要
- バッファとして、プールが利用可能
プールのバラエティ:
- 上限付き or 上限無し
- 上限付きなら、生産者と消費者の間で(緩い)同期が可能。また実装がより単純
- 生産者と消費者のペースに制限を付ける必要性がないなら、上限無しの方が有用
- 提供するメソッドが total or partial or synchronous
- total: 処理が進むのに特定の状態を要求しない
- ex. get(), boundedなset() => error or exception
- partial: 処理が進むのに特定の状態を要求する。状態が満たされないなら待機
- ex. non empty pool get()
- synchronous: 他のスレッドのメソッド呼び出しと協調(同期)する
- プールに要素を追加する際には、その要素を誰かが消費するまで、メソッド呼び出しがブロックする
- erlangで云う
gen_server:call/3
- total: 処理が進むのに特定の状態を要求しない
- いろいろな公平性保障
- first-in-first-out (a queue)
- last-in-first-out (a stack)
- or other, weaker properties
この章では特にFIFOのキューを対象とする:
enq()
,deq()
- 並列キューは直接キューに線形化される
- 異なる公平性保障を考慮
注意:
- 簡潔性のために
null
をキューの要素として入れることはできないものとする
どの程度の並列性が上限付きキューに期待できるか?
enq()
とdeq()
はキューの両端に対する操作なので、原理上は干渉なしで実行可能なはず- キューが空 or 満杯のケースは除く
- 実際には(例えば)
enq()
呼び出しがdeq()
に干渉して、後者の実行を待たせることもある - 実際の並列性を推し量るのは容易ではない
上限付きキューのリンクリストによる実装:
// Fig.10.2
public class BoundedQueue<T> {
ReentrantLock enqLock, deqLock;
Condition notEmptyCondition, notFullCondition;
AtomicInteger size;
volatile Node head, tail;
int capacity;
public BoundedQueue(int _capacity) {
capacity = _capacity;
head = new Node(null);
tail = head;
size = new AtomicInteger(0);
enqLock = new ReentrantLock();
nonFullCondition = enqLock.newCondition();
deqLock = new ReentrantLock();
nonEmptyCondition = deqLock.newCondition();
}
// Fig. 10.5
protected class Node {
public T value;
public volatile Node next;
public Node(T x) {
value = x;
next = null;
}
}
// Fig. 10.3
public void enq(T x) {
boolean mustWakeDequeuers = false;
// キューの末尾に要素を追加
enqLock.lock();
try {
while (size.get() == capacity)
notFullCondition.await();
Node e = new Node(x);
tail.next = e;
tail = tail.next;
if (size.getAndIncrement() == 0)
mustWakeDequeuers = true;
} finally {
enqLock.unlock();
}
// 必要なら消費者に通知
if (mustWakeDequeuers) {
deqLock.lock();
try {
notEmptyCondition.signalAll();
} finally {
deqLock.unlock();
}
}
}
// Fig.10.4
public T deq() {
T result;
boolean mustWakeEnqueuers = false;
// キューの先頭から要素を取り出し
deqLock.lock();
try {
while (size.get() == 0)
notEmptyCondition.await();
result = head.next.value;
head = head.next;
if (size.getAndDecrement() == capacity)
mustWakeEnqueuers = true;
} finally {
deqLock.unlock();
}
// 必要なら生産者に通知
if (mustWakeEnqueuers) {
enqLock.lock();
try {
notFullCondition.signalAll();
} finally {
enqLock.unlock();
}
}
return result;
}
}
補足:
- Fig.10.6:
- キューの処理の図示
- Chapter9のリストの実装とは異なり、今回は番兵ノードが繰り返し更新される
enqLock
とdeqLock
- 同時に実行されるenqueuerは一つのみ、dequeuerも同様
- ! つまり、この実装の並列度はあまり高くない
- ロックを二つに分けているのは、不必要に(例えば)前者が後者をブロックしなくて済むように
- 同時に実行されるenqueuerは一つのみ、dequeuerも同様
notEmptyCondition
とnotFullCondition
- Chapter 8とだいたい同様 (おそらく)
- "lost-wakeup"とかにも気を付けています
lost-wakeupについて:
- 以下のケースが起こり得ないか
- enq:
size
==capacity - deq:
size
をデクリメント - deq:
notFullCondition
を通知 - enq:
notFullCondition
で待機 (既に通知済みなので、延々と待機)
- enq:
- 1と4は
enqLock
の中で実行され、deqもenqLock
は獲得しようとするので大丈夫
抽象キューの先頭および末尾要素は、必ずしもhead
およびtail
が指すものと一致しない:
- 論理的には
tail.next
にノードが設定された時点で、要素が追加されているtail
フィールドが更新される前でもenq()
とdeq()
は独立なので、head
の方が先に新しいノードを参照する可能性もある
- !
size
を見る関係上、tail.next
の更新だけでは論理的にも追加されないのでは? (10.4のキューなら話は別)
この実装の欠点の一つ:
- ロックを通さない箇所でも、
enq()
とdeq()
が干渉するsize
フィールドに対するgetAndIncrement()
およびgetAndDecrement()
- これらのメソッドは、通常のメモリ読み書き命令よりも高価で、直列ボトルネックになり得る
- カウンタを(non-atomicな)
enqSideSize
とdesideSize
に分割するのは一つの手enq()
とdeq()
では、それぞれを独立にインクリメントおよびデクリメントenqSideSize == capacity
が成立したら、enqSideSize
にdeqSideSize
を足して、後者は0にする- この操作は両ロックを取得して行う
- capacityが十分に大きいなら、毎回同期が発生するよりは、同期点をだいぶまばらにできる
基本は上限付きキューと同じ:
- 要素数のカウントは不要となった
- 待機用のConditionオブジェクトも不要
// Fig.10.7
public void enq(T x) {
enqLock.lock();
try {
Node e = new Node(x);
tail.next = e;
tail = e;
} finally {
enqLock.unlock();
}
}
// Fig.10.8
public T deq() throws EmptyException {
T result;
deqLock.lock();
try {
if (head.next == null) {
throw new EmptyException();
}
result = head.next.value;
head = head.next;
} finally {
deqLock.unlock();
}
return result;
}
補足:
- このキューはデッドロックしない
enq()
およびdeq()
は、それぞれ一つのロックしか取得しないため- 番兵ノードが削除されることはないので、
enq()
はロック獲得後すぐに成功するはず deq()
はキューが空なら失敗する- (10.3と同様に)論理キューの先端と末端は
head
とtail
の実際の値とは異なる- タイミング次第では、
head
がtail
を追い越すことも - 論理的な先端:
head
が指すノード - 論理的な末端: 先端から到達可能な最後のノード
- タイミング次第では、
enq()
およびdeq()
は__total__- 両方ともキューが空でも満杯でも待機しないため
- ! 並列プログラムと関係のない文脈では、例外が発生するような呼び出しが存在し得るメソッドは total と呼ばないことが多いような気がする
- ! progress condition (?)的な観点から total とか partial とか云っている?
この節ではLockFreeQueue<T>
クラスを扱う:
- 図の10.9から10.11が実装コード
- 上限無しトータルキューの自然な拡張
- 速いスレッドが遅いスレッドを助けることで、飢餓を防いでいる
- CAS使いたいので
Node.next
フィールドをvolatile
からAtomicReference
に変更 - 図10.12にキューの図 (コードと重複するので説明は省略)
// 以下はLockFreeQueue<T>クラスのスコープ内
// Fig. 10.9
public class Node {
public T value;
public AtomicReference<Node> next;
public Node(T value) {
this.value = value;
next = new AtomicReference<Node>(null);
}
}
// Fig. 10.10 (書き換え版)
public void enq(T value) {
Node node = new Node(value);
while (true) {
Node last = tail.get();
Node next = last.next.get();
// チェック1
if (last != tail.get()) {
// 他の誰かがtailの更新操作を行った
// ! このチェック自体は必須ではない (競合度が高い場合の性能劣化軽減?)
continue;
}
// チェック2
if (next != null) {
// 他の誰かがnextの更新操作を行った
// CASの失敗チェックはしない (失敗 == 他の誰かが同様の操作を行って成功した、ということなので)
tail.compareAndSet(last, netx); // tailを進める (自前で行うことにより、スケジューラに依存せずに処理が進められる)
continue;
}
// 末尾に要素追加
if (last.next.compareAndSet(next, node)) {
tail.compareAndSet(last, node); // tailを進める
return;
}
}
}
// Fig. 10.11 (書き換え版)
public T deq() throws EmptyException {
while (true) {
Node first = head.get();
Node last = tail.get();
Node next = first.next.get();
// チェック1
if (first != head.get()) {
// 他の誰かがheadの更新操作を行った
continue;
}
// チェック2
if (first == last) {
if (next == null) {
// キューが空
throw new EmptyException();
}
// nextがnullではないので、新要素のセット自体は終わっている
tail.compareAndSet(last, next); // tailを進める (自前で行うことにより、スケジューラに依存せずに処理が進められる)
continue;
}
// 先頭要素の取り出し
T value = next.value;
if (head.compareAndSet(first, next)) {
return value;
}
}
}
enc()
は__lazy__なのが興味深い:
- このメソッドをロックフリーにするためには、複数のスレッドが互いに助け合う必要がある
- キューに要素を追加するには以下の二つのステップが必要
-
next
に対してcompareAndSet()
を呼んで、新しいノードを追加する
-
tail
に対してcompareAndSet()
を呼んで、末尾ノードを(古いものから)最新のものへと更新する
-
- この二つのステップはアトミックには実行されない
- 他の全ての
enq()
を呼び出すスレッドは、他のスレッドのステップ1
だけが終わった状態に備える必要がある - これは六章で扱ったUniversal Constructionで見た
helpingテクニック
の実例
- 他の全ての
enq()
についての補足:
- トータル: dequeuerを待つことがない
- 成功時の線形化点は
tail
フィールドを新しいノードに更新した時- 更新者: 自スレッド or 他のお助けスレッド
deq()
についての補足:
- Fig 10.13
- なぜdequeuerが
tail
を進めるのを助ける必要があるのか tail
が、削除されようとしているノードを指したままにならないようにするため-
- 初期: head と tail が同じ番兵ノードを指している
-
- enqの途中: tail.nextにノードがセットされた
-
- deq: head.nextにノードがある => 取り出しで番兵ノードを進める
- => 古い番兵ノードは削除され、
tail
が指しているノードが無効に!
-
- ! JavaだとGCがあるので、別にdequeuerが協力しなくても問題がないような気もする (次節との兼ね合い?)
- なぜdequeuerが
- 線形化点
- 成功時の線形化点は要素の取り出し(CAS)時
- 失敗時の線形化点はキューが空だと判定された時
ロックフリー性について:
- このキューがロックフリーであることをチェックするのは簡単
- 全てのメソッド呼び出しは、初めに未完了の
enq()
呼び出しがないかをチェックし、必要なら完了させようとする - 最悪のケースでは全てのスレッドが
tail
フィールドを進めようとし、その中の一つが必ず成功する - enqueue or dequeue 時に、あるスレッドが(CASに)失敗する際には、必ず別のスレッドがメソッド呼び出しに成功している
- 全てのメソッド呼び出しは、初めに未完了の
- ロックフリーにするとキューの性能が十分に改善する
- ロックフリーアルゴリズムは、最も効率的なブロッキングアルゴリズムよりも、性能が優れている傾向がある
これまでのキューの実装は、JavaのGCに依存している:
- dequeueされたノード(が保持するメモリ)の再回収はGC任せ
- 自分でメモリ管理機能を実装しようとするとどうなるか?
- いくつかの理由で自前実装を行いたくなる場合がある:
- C/C++といったGCを備えていない言語で実装するなら普通の選択
- GCがある言語でも、ある種の領域では、自前の方がより効率的なメモリ管理が行えることがある
- 特に沢山の小さなオブジェクトの生成と削除が行われるなら
- GCがロックフリーではないなら、ロックフリーなメモリ管理を提供したいかも
ノードをロックフリーに再利用するための自然な方法:
- 各スレッドがプライベートな、使われていないキューのエントリを保持するための
free list
を管理する - enqueue時には、そのスレッドローカルなリストの先頭から要素を取り出す
- リストが空なら
new
を使って通常通りのノード割り当てを行う
- リストが空なら
- dequeue時には、取り出したノードをリストに格納する
- リストはスレッドローカルなので、(高価な)同期処理は一切不要
- 各スレッド内でのenqueueとdequeueがほぼ同数なら、このデザインは上手く働く
- 両者にバランスがとれていないなら、より複雑な手法が求められる
- ex. 定期的に他の(dequeueが多い)スレッドのリストから要素を奪う
上の方法を単純に前節のロックフリーキューに適用しても、上手く動作しない:
- Fig. 10.14: 問題となるケースの図示
- 状況: スレッドAはdequeuerで、
head
はノードaを、head.next
はノードbを指している -
- スレッドAが
head
を(ノードaからノードbに)更新するCASの直前まで進んだ
- スレッドAが
-
- スレッドAがCASを呼び出す前に、他のスレッドがいろいろと処理を進めた
- ノードaとノードbは、キューから取り出されて、フリーリストの中に移動
- その後、ノードaがenqueuerに再利用
- 最終的には、ノードaが再びキューの先頭まで移動
- ノードbは、依然としてフリーリストの中
-
- スレッドAが動き出しCASを実行
- 先頭要素はノードaなので、CAS操作は成功する
- => `head'が誤って、フリーリストの中にあるノードbに設定されてしまう
- 状況: スレッドAはdequeuerで、
- これがABA問題と呼ばれるもの
- CASのような条件付き同期操作を使った動的メモリアルゴリズムでは良く見かける現象
- 典型的には、まさにCASで更新されようとしている参照がaからbに変わり、再びaに戻る
- 結果的にCASは成功するが、それはすでに構造が変わっているデータに対してであって、望まれた効果は得られない
解決策は?
- 最も単純な方法としては
AtomicStampedReference<T>
を使うのが良い - アトミックなリファレンスにユニークなスタンプが付いたもの
- スタンプは単独ででも参照と同時にででも、アトミックに更新できる
- 詳細は Pragma 10.6.1 で
AtomicStampedReference<T>
クラス:
- T型のオブジェクトの参照と整数型のスタンプの両方を保持する
- Pragmra 9.8.1 で出てきた
AtomicMarkableReference<T>
の一般化 (boolean => integer)
スタンプの用途:
- 通常はABA問題を回避するために使われる
- オブジェクトを更新する度にスタンプを増加させる
- 時々は、有限の状態を表すために使用することもある (Chapter11の
LockFreeExchanger
)
// Fig. 10.15
//
// AtomicStampedReference<T>が提供するメソッド(の一部)
// スタンプ付きのCAS: expectedReferenceとexpectedStampの両方が一致する場合にのみ、成功する
public boolean compareAndSet(T expectedReference, T newReference,
int expectedStamp, int newStamp);
public T get(int[] stampHolder);
public void set(T newReference, int newStamp);
CやC++のような言語での実現方法:
- 64bitアーキテクチャ: ポインタからいくつかのビットを"stealing"することで実現可能
- 32bitアーキテクチャ: おそらく"a level of indirection"が要求される
Fig 10.16: ABA問題を避けるためにスタンプを使用したdeq()
の実装:
// AtomicStampedReference<T>を使用している以外は、特に以前との相違点はなし
public T deq() throws EmptyException {
int[] lastStamp = new int[1];
int[] firstStamp = new int[1];
int[] nextStamp = new int[1];
while (true) {
Node first = head.get(firstStamp);
Node last = tail.get(lastStamp);
Node next = first.next.get(nextStamp);
if (first == last) {
if (next == null) {
throw new EmptyException();
}
tail.compareAndSet(last, next, lastStamp[0], lastStamp[0] + 1); // スタンプ付き
} else {
T value = next.value;
if (head.compareAndSet(first, next, firstStamp[0], firstStamp[0] + 1) { // スタンプ付き
free(first); // ノード解放 (フリーリストへ)
return value;
}
}
}
}
ABA問題は沢山の同期シナリオで起こり得る:
- 条件付き同期だけではない
- 例えば loads and stores だけを使っている場合にも起こり得る
- load-linked/sotre-conditional のような条件付き同期操作(see: Appendix B)ならABA問題を回避できる
- 時間上の二つの点の間で、特定の値が等しいかだけでなく、更新が行われたかどうかまでチェックできる
より tighter kind な同期についても見ていこう:
- 複数(one or more)のproducerスレッドがアイテムを生成し、複数のconsumerスレッドがFIFO順で取り除く
- ただし、producerとconsumerはランデブーする
- producerは、consumerが要素を取り出すまでブロックする
- このようなランデブー同期は、CSPやAdaのような言語には標準で組み込まれている (Go言語も)
Fig. 10.17は、単純なモニターベースのsynchronousキューの実装
public class SynchronousQueue<T> {
T item = null;
boolean enqueuing;
Lock lock;
Condition condition;
...
public void enq(T value) {
lock.lock();
try {
// 他にもenqueuerがいるなら待機
while (enqueuing)
condition.await();
// 要素追加
enqueuing = true;
item = value;
condition.signalAll(); // dequeuerに対する通知
while (item != null)
condition.await(); // dequeuerが取り出すまで待機 (ランデブー)
// enqueueが終わったことを通知
enqueuing = false;
condition.signalAll(); // enqueuerに対する通知
} finally {
lock.unlock();
}
}
public T deq() {
lock.lock();
try {
// enqueuerを待機 (ランデブー)
while (item == null)
condition.await();
// 要素取り出し
T t = item;
item = null;
condition.signalAll(); // enqueuerに対する通知
return t;
} finally {
lock.unlock();
}
}
}
このキューのデザインは比較的シンプルだが、高い同期コストを負っている:
- あるスレッドが別のスレッドを呼び起こす全ての点で、enqueuersとdequeuersの両方が、全ての待機スレッドを呼び起こす
- 待機スレッド数に対して二次オーダーのwakeupsを引き起こす
- wakeupsの数を抑えるために、複数の条件オブジェクトを使用することは可能
- ただしまだ全ての呼び出しをブロックすることは必要で、それは高価
synchronousキューの同期オーバヘッドを軽減するにはどうするか?
enq()
とdeq()
を二つのステップに分けたキューの実装を考えてみる- 例: 空のキューからdequeuerがどうやって要素を取り出そうとするか
-
- 予約オブジェクトをキューに入れる (dequeuerが待機状態であることを示す)
-
- dequeuerは予約オブジェクトが持つフラグの上でスピンする
-
- その後、enqueuerが予約オブジェクトを見つけたら、以下を行い予約を果たす(fullfill)
- アイテムを預ける
- 予約オブジェクトのフラグを設定することで、dequeuerに通知する
- enqueuerが待機する場合も同様に、自分の予約オブジェクトを作成し、フラグ上でスピンする
- キューは常に「enq予約保持」or「deq予約保持」or「空」のいずれかの状態である
この構造は二重データ構造(dual data structur)と呼ばれる:
- メソッドが「reservation」と「fulfillment」の二つのステップに分かれて実行されるため
沢山の良い性質を備えている:
- 待機スレッドがローカルにキャッシュされたフラグ上でスピンできる (which we have seen is essential for scalability)
- 公平性を自然な方法で保障する
- 予約は到着順にキューイングされ、リクエストが同じ順で果たされる(fulfilled)のが保障される
- NOTE: このデータ構造は線形化可能
- それぞれの部分メソッド呼び出しは、それが果たされた時点で順序付け可能なため
実装:
- このキューはノードのリストとして実装可能
- ノードは次のどちらかの一つを表す (
type
フィールドでどちらか判別)- dequeueされるのを待っている状態のアイテム (for enqueuer)
- fulfillされるのを待っている状態の予約 (for dequeuer)
- At any time, 全てのキューのノードは同じタイプである
- キューは「dequeue待ちのアイテム群のみで構成される」か「fulfill待ちの予約群のみで構成される」かのいずれか
// Fig. 10.18
//
// SyncronousDualQueue<T>のスコープ内
private enum NodeType {ITEM, RESERVATION};
private class Node {
volatile NodeType type;
volatile AtomicReference<T> item;
volatile AtomicReference<Node> next;
// enqueuerの場合: new Node(someValue, ITEM);
// dequeuerの場合: new Node(null, RESERVATION);
Node(T myItem, NodeType myType) {
item = new AtomicReference<T>(myItem);
next = new AtomicReference<T>(null);
type = myType
}
}
// Fig. 10.19 (整理版)
//
// SynchronousDualQueue<T>のスコープ内
public SynchronousDualQueue() {
Node sentinel = new Node(null, NodeType.ITEM);
head = new AtomicReference<Node>(sentinel);
tail = new AtomicReference<Node>(sentinel);
}
public void enq(T e) {
while (true) {
Node t = tail.get(), h = head.get();
if (h == t || t.type == NodeType.ITEM) {
// 待機中のdequeuerが存在しない
if (waitDequeuer(h, t, e)) {
return;
}
} else {
// 待機中のdequeuerが存在する
if (fulfillItem(t, h, e)) {
return;
}
}
}
}
boolean waitDequeuer(Node t, Node h, T e) {
Node n = t.next.get();
if (t != tail.get()) {
// 他の誰かと競合している (retry)
return false;
}
if (n != null) {
// tailが最終ノードを指していない (tailを進めてretry)
tail.compareAndSet(t, n);
return false;
}
// dequeuer待機用の要素の追加
Node offer = new Node(e, NodeType.ITEM);
if (! t.next.compareAndSet(n, offer)) {
// 他の誰かが先に追加していた (retry)
return false;
}
// 追加成功
tail.compareAndSet(t, offer); // 失敗しても良いのでとりあえずtailを進めてみる (ロックフリーキューの時と同様)
while (offer.item.get() == e); // ローカルオブジェクト上でスピン
// dequeuerが来たのでループを抜けた
// headを進めておく:
// This step serves only to enhance performance, because the implementation remains correct,
// whether or not the method advances the head reference.
h = head.get();
if (offer == h.next.get())
head.compareAndSet(h, offer);
return true;
}
boolean fulfillItem(Node t, Node h, T e) {
Node n = h.next.get();
if (t != tail.get() || h != head.get() || n == null) {
// 他の誰かと競合している (retry)
return false;
}
boolean success = n.item.compareAndSet(null, e); // fulfill
head.compareAndSet(h, n); // 失敗しても良いのでとりあえずheadを進めてみる (ロックフリーキューの時と同様)
return success;
}
省略