Skip to content

Instantly share code, notes, and snippets.

@shuzheng
Created August 9, 2022 03:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shuzheng/1359eff21dc91cc342af25ec6d8fe419 to your computer and use it in GitHub Desktop.
Save shuzheng/1359eff21dc91cc342af25ec6d8fe419 to your computer and use it in GitHub Desktop.
缓冲队列: 一个线程安全的Queue,可实现缓冲功能,当满足指定缓冲数量或缓冲时间两个条件任意一个时,执行回调方法返回缓冲数据
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
/**
* 缓冲队列
* 一个线程安全的Queue,可实现缓冲功能,当满足指定缓冲数量或缓冲时间两个条件任意一个时,执行回调方法返回缓冲数据
* @author shuzheng
* @date 2022/8/9
*/
public class BufferQueue<T> {
/**
* 队列
*/
private Queue<T> queue;
/**
* 队列监控线程
*/
private BufferQueueThread bufferQueueThread;
public BufferQueue(int bufferBatchCount, int bufferMillis, BufferQueueCallBack<List<T>> bufferQueueCallBack) {
this.queue = new ConcurrentLinkedDeque<>();
this.bufferQueueThread = new BufferQueueThread(queue, bufferBatchCount, bufferMillis, bufferQueueCallBack);
this.bufferQueueThread.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
bufferQueueThread.flush();
}));
}
/**
* 进队列方法
* @param t
*/
public void add(T t) {
queue.add(t);
}
}
/**
* BufferQueue的队列监控线程
* Created by ZhangShuzheng on 2018/5/26.
*/
class BufferQueueThread<T> extends Thread {
/**
* 监控队列
*/
private Queue<T> queue;
/**
* 缓冲元素数量阈值
*/
private int bufferBatchCount;
/**
* 缓冲时间阈值
*/
private int bufferMillis;
/**
* 最后回调时间
*/
private long lastCallBackMilliSecond = System.currentTimeMillis();
/**
* 回调接口
*/
private BufferQueueCallBack<List<T>> bufferQueueCallBack;
public BufferQueueThread(Queue<T> queue, int bufferBatchCount, int bufferMillis, BufferQueueCallBack<List<T>> bufferQueueCallBack) {
this.queue = queue;
this.bufferBatchCount = bufferBatchCount;
this.bufferMillis = bufferMillis;
this.bufferQueueCallBack = bufferQueueCallBack;
}
@Override
public void run() {
while (true) {
long currentTimeMillis = System.currentTimeMillis();
int pollCount = 0;
// 当队列中缓冲数量达到指定数量阈值,将缓冲元素出队列
if (queue.size() >= bufferBatchCount) {
pollCount = bufferBatchCount;
} else {
long diffMillis = currentTimeMillis - lastCallBackMilliSecond;
// 当达到缓冲时间阈值,则所有元素出队列
if (diffMillis > bufferMillis || diffMillis < 0) {
pollCount = queue.size();
}
}
if (pollCount > 0) {
callBack(pollCount);
} else {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
* 回调指定数量的元素
* @param pollCount
*/
private void callBack(int pollCount) {
List<T> items = new ArrayList<>(pollCount);
for (int i = 0; i < pollCount; i++) {
T item = queue.poll();
items.add(item);
}
bufferQueueCallBack.run(items);
lastCallBackMilliSecond = System.currentTimeMillis();
}
/**
* 清空缓冲元素
*/
public void flush() {
while (queue.size() > 0) {
int queueCount = queue.size();
int pollCount = queueCount > bufferBatchCount ? bufferBatchCount : queueCount;
callBack(pollCount);
}
}
}
/**
* BufferQueue的回调接口
* @author shuzheng
* @date 2022/8/9
*/
public interface BufferQueueCallBack<T> {
/**
* 回调方法
* @param t
*/
void run(T t);
}
BufferQueue<String> bufferQueue = new BufferQueue<>(100, 5 * 1000, items -> {
System.out.println(items.size());
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment