Skip to content

Instantly share code, notes, and snippets.

@nyatla
Created January 14, 2020 16:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nyatla/f8dd4393d22546d7b170f9164156906c to your computer and use it in GitHub Desktop.
Save nyatla/f8dd4393d22546d7b170f9164156906c to your computer and use it in GitHub Desktop.
シャットダウン機能のあるマルチスレッドのキューオブジェクト.JavaのBlockingQueueはシャットダウン機能がないのでつくった。
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* シャットダウン機能のあるマルチスレッドのキューオブジェクト.
* すべての関数はマルチスレッドセーフです。
*
* @param <T>
*/
public class CloseableBlockingQueue<T> implements Closeable{
//offer判定のセマフォ
final private Semaphore s_offer=new Semaphore(1);
final private Semaphore s_pool=new Semaphore(1);
enum ClosingStatus{
NONE, //何も閉じていない
CLOSING,//クローズ中
};
/** close操作が行われているかのフラグ*/
private ClosingStatus closing_status=ClosingStatus.NONE;
ArrayBlockingQueue<T> q;
CloseableBlockingQueue(int capacity) throws InterruptedException{
this.q=new ArrayBlockingQueue<T>(capacity);
for(int i=0;i<capacity-1;i++) {
this.s_offer.release();
}
this.s_pool.acquire();
}
/**
* キューにオブジェクトを追加する。
* @param item
* @param timeout_in_ms
* @throws InterruptedException
* @throws IOException
* キューが閉じられた。
*/
public boolean offer(T item,long timeout_in_ms) throws InterruptedException, IOException{
boolean go=this.s_offer.tryAcquire(timeout_in_ms,TimeUnit.MILLISECONDS);
synchronized(this) {
final ClosingStatus cs=this.closing_status;
switch(cs) {
case CLOSING:
//クローズ要求時
if(go)
{
this.s_offer.release();//ほかのスレッドも開放するため
}
throw new IOException();
case NONE:
if(go)
{
if(!this.q.offer(item)) {
throw new Error();//設計上失敗してはいけないの。
}
//offerが成功したのでpoolをキック
this.s_pool.release();
return true;
}
return false;
default:
throw new Error();
}
}
}
/**
* キューからオブジェクトを読み出す。
* @param timeout_in_ms
* @return
* @throws InterruptedException
* @throws IOException
* キューが閉じられ、空になった。
*/
public T pool(long timeout_in_ms) throws InterruptedException, IOException {
boolean go=this.s_pool.tryAcquire(timeout_in_ms,TimeUnit.MILLISECONDS);
//この隙間
synchronized(this) {
final ClosingStatus cs=this.closing_status;
switch(cs) {
case CLOSING:
if(go) {
this.s_pool.release();//ほかのスレッドをkillするための再生成。
T o=this.q.poll();
if(o==null) {
//終端到達
throw new IOException("END OF Q");
}
return o;
}
//closingでタイムアウト。レアケース。"この隙間"でcloseが実行された場合ね。
return null;
case NONE:
if(go) {
T o=this.q.poll();
if(o==null) {
throw new Error();
}
this.s_offer.release();
return o;
}
//この隙間でofferが実行された場合
return null;
default:
throw new Error();
}
}
}
/**
* キューをシャットダウンする。
* 関数を呼び出した時点で、メンバ関数の動作は以下のように変更される。
* offer関数 - ブロックが解除され、例外が発生する。新たな呼び出しは失敗する。
* pool関数 - ブロックが解除され、例外が発生する。未読出し要素がある場合は空になるまでの間呼び出しは成功する。
* @param is_clear
* trueを設定すると未読出しの要素をクリアする。
* @throws InterruptedException
*/
public void shutdown(boolean is_clear)
{
//closingをセットしてrelease
synchronized(this) {
switch(this.closing_status) {
case CLOSING:
//何もしない。
break;
case NONE:
if(is_clear) {
this.q.clear();
}
this.closing_status=ClosingStatus.CLOSING;
this.s_offer.release();
this.s_pool.release();
break;
default:
throw new Error();
}
}
return;
}
/**
* {@link #shutdown(bool)}にtrueを設定した関数と同じ。
* 関数が完了した後はすべての関数が使用不能になる。
*/
@Override
public void close() {
this.shutdown(true);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment