Created
January 14, 2020 16:27
-
-
Save nyatla/f8dd4393d22546d7b170f9164156906c to your computer and use it in GitHub Desktop.
シャットダウン機能のあるマルチスレッドのキューオブジェクト.JavaのBlockingQueueはシャットダウン機能がないのでつくった。
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.Closeable; | |
import java.io.IOException; | |
import java.util.concurrent.ArrayBlockingQueue; | |
import java.util.concurrent.Semaphore; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* シャットダウン機能のあるマルチスレッドのキューオブジェクト. | |
* すべての関数はマルチスレッドセーフです。 | |
* | |
* @param <T> | |
*/ | |
public class CloseableBlockingQueue<T> implements Closeable{ | |
//offer判定のセマフォ | |
final private Semaphore s_offer=new Semaphore(1); | |
final private Semaphore s_pool=new Semaphore(1); | |
enum ClosingStatus{ | |
NONE, //何も閉じていない | |
CLOSING,//クローズ中 | |
}; | |
/** close操作が行われているかのフラグ*/ | |
private ClosingStatus closing_status=ClosingStatus.NONE; | |
ArrayBlockingQueue<T> q; | |
CloseableBlockingQueue(int capacity) throws InterruptedException{ | |
this.q=new ArrayBlockingQueue<T>(capacity); | |
for(int i=0;i<capacity-1;i++) { | |
this.s_offer.release(); | |
} | |
this.s_pool.acquire(); | |
} | |
/** | |
* キューにオブジェクトを追加する。 | |
* @param item | |
* @param timeout_in_ms | |
* @throws InterruptedException | |
* @throws IOException | |
* キューが閉じられた。 | |
*/ | |
public boolean offer(T item,long timeout_in_ms) throws InterruptedException, IOException{ | |
boolean go=this.s_offer.tryAcquire(timeout_in_ms,TimeUnit.MILLISECONDS); | |
synchronized(this) { | |
final ClosingStatus cs=this.closing_status; | |
switch(cs) { | |
case CLOSING: | |
//クローズ要求時 | |
if(go) | |
{ | |
this.s_offer.release();//ほかのスレッドも開放するため | |
} | |
throw new IOException(); | |
case NONE: | |
if(go) | |
{ | |
if(!this.q.offer(item)) { | |
throw new Error();//設計上失敗してはいけないの。 | |
} | |
//offerが成功したのでpoolをキック | |
this.s_pool.release(); | |
return true; | |
} | |
return false; | |
default: | |
throw new Error(); | |
} | |
} | |
} | |
/** | |
* キューからオブジェクトを読み出す。 | |
* @param timeout_in_ms | |
* @return | |
* @throws InterruptedException | |
* @throws IOException | |
* キューが閉じられ、空になった。 | |
*/ | |
public T pool(long timeout_in_ms) throws InterruptedException, IOException { | |
boolean go=this.s_pool.tryAcquire(timeout_in_ms,TimeUnit.MILLISECONDS); | |
//この隙間 | |
synchronized(this) { | |
final ClosingStatus cs=this.closing_status; | |
switch(cs) { | |
case CLOSING: | |
if(go) { | |
this.s_pool.release();//ほかのスレッドをkillするための再生成。 | |
T o=this.q.poll(); | |
if(o==null) { | |
//終端到達 | |
throw new IOException("END OF Q"); | |
} | |
return o; | |
} | |
//closingでタイムアウト。レアケース。"この隙間"でcloseが実行された場合ね。 | |
return null; | |
case NONE: | |
if(go) { | |
T o=this.q.poll(); | |
if(o==null) { | |
throw new Error(); | |
} | |
this.s_offer.release(); | |
return o; | |
} | |
//この隙間でofferが実行された場合 | |
return null; | |
default: | |
throw new Error(); | |
} | |
} | |
} | |
/** | |
* キューをシャットダウンする。 | |
* 関数を呼び出した時点で、メンバ関数の動作は以下のように変更される。 | |
* offer関数 - ブロックが解除され、例外が発生する。新たな呼び出しは失敗する。 | |
* pool関数 - ブロックが解除され、例外が発生する。未読出し要素がある場合は空になるまでの間呼び出しは成功する。 | |
* @param is_clear | |
* trueを設定すると未読出しの要素をクリアする。 | |
* @throws InterruptedException | |
*/ | |
public void shutdown(boolean is_clear) | |
{ | |
//closingをセットしてrelease | |
synchronized(this) { | |
switch(this.closing_status) { | |
case CLOSING: | |
//何もしない。 | |
break; | |
case NONE: | |
if(is_clear) { | |
this.q.clear(); | |
} | |
this.closing_status=ClosingStatus.CLOSING; | |
this.s_offer.release(); | |
this.s_pool.release(); | |
break; | |
default: | |
throw new Error(); | |
} | |
} | |
return; | |
} | |
/** | |
* {@link #shutdown(bool)}にtrueを設定した関数と同じ。 | |
* 関数が完了した後はすべての関数が使用不能になる。 | |
*/ | |
@Override | |
public void close() { | |
this.shutdown(true); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment