Created
August 9, 2022 03:26
-
-
Save shuzheng/1359eff21dc91cc342af25ec6d8fe419 to your computer and use it in GitHub Desktop.
缓冲队列: 一个线程安全的Queue,可实现缓冲功能,当满足指定缓冲数量或缓冲时间两个条件任意一个时,执行回调方法返回缓冲数据
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Queue; | |
import java.util.concurrent.ConcurrentLinkedDeque; | |
/** | |
* 缓冲队列 | |
* 一个线程安全的Queue,可实现缓冲功能,当满足指定缓冲数量或缓冲时间两个条件任意一个时,执行回调方法返回缓冲数据 | |
* @author shuzheng | |
* @date 2022/8/9 | |
*/ | |
public class BufferQueue<T> { | |
/** | |
* 队列 | |
*/ | |
private Queue<T> queue; | |
/** | |
* 队列监控线程 | |
*/ | |
private BufferQueueThread bufferQueueThread; | |
public BufferQueue(int bufferBatchCount, int bufferMillis, BufferQueueCallBack<List<T>> bufferQueueCallBack) { | |
this.queue = new ConcurrentLinkedDeque<>(); | |
this.bufferQueueThread = new BufferQueueThread(queue, bufferBatchCount, bufferMillis, bufferQueueCallBack); | |
this.bufferQueueThread.start(); | |
Runtime.getRuntime().addShutdownHook(new Thread(() -> { | |
bufferQueueThread.flush(); | |
})); | |
} | |
/** | |
* 进队列方法 | |
* @param t | |
*/ | |
public void add(T t) { | |
queue.add(t); | |
} | |
} | |
/** | |
* BufferQueue的队列监控线程 | |
* Created by ZhangShuzheng on 2018/5/26. | |
*/ | |
class BufferQueueThread<T> extends Thread { | |
/** | |
* 监控队列 | |
*/ | |
private Queue<T> queue; | |
/** | |
* 缓冲元素数量阈值 | |
*/ | |
private int bufferBatchCount; | |
/** | |
* 缓冲时间阈值 | |
*/ | |
private int bufferMillis; | |
/** | |
* 最后回调时间 | |
*/ | |
private long lastCallBackMilliSecond = System.currentTimeMillis(); | |
/** | |
* 回调接口 | |
*/ | |
private BufferQueueCallBack<List<T>> bufferQueueCallBack; | |
public BufferQueueThread(Queue<T> queue, int bufferBatchCount, int bufferMillis, BufferQueueCallBack<List<T>> bufferQueueCallBack) { | |
this.queue = queue; | |
this.bufferBatchCount = bufferBatchCount; | |
this.bufferMillis = bufferMillis; | |
this.bufferQueueCallBack = bufferQueueCallBack; | |
} | |
@Override | |
public void run() { | |
while (true) { | |
long currentTimeMillis = System.currentTimeMillis(); | |
int pollCount = 0; | |
// 当队列中缓冲数量达到指定数量阈值,将缓冲元素出队列 | |
if (queue.size() >= bufferBatchCount) { | |
pollCount = bufferBatchCount; | |
} else { | |
long diffMillis = currentTimeMillis - lastCallBackMilliSecond; | |
// 当达到缓冲时间阈值,则所有元素出队列 | |
if (diffMillis > bufferMillis || diffMillis < 0) { | |
pollCount = queue.size(); | |
} | |
} | |
if (pollCount > 0) { | |
callBack(pollCount); | |
} else { | |
try { | |
Thread.sleep(100); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
} | |
/** | |
* 回调指定数量的元素 | |
* @param pollCount | |
*/ | |
private void callBack(int pollCount) { | |
List<T> items = new ArrayList<>(pollCount); | |
for (int i = 0; i < pollCount; i++) { | |
T item = queue.poll(); | |
items.add(item); | |
} | |
bufferQueueCallBack.run(items); | |
lastCallBackMilliSecond = System.currentTimeMillis(); | |
} | |
/** | |
* 清空缓冲元素 | |
*/ | |
public void flush() { | |
while (queue.size() > 0) { | |
int queueCount = queue.size(); | |
int pollCount = queueCount > bufferBatchCount ? bufferBatchCount : queueCount; | |
callBack(pollCount); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* BufferQueue的回调接口 | |
* @author shuzheng | |
* @date 2022/8/9 | |
*/ | |
public interface BufferQueueCallBack<T> { | |
/** | |
* 回调方法 | |
* @param t | |
*/ | |
void run(T t); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
BufferQueue<String> bufferQueue = new BufferQueue<>(100, 5 * 1000, items -> { | |
System.out.println(items.size()); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment