Skip to content

Instantly share code, notes, and snippets.

@sunzsh
Created May 28, 2022 17:59
Show Gist options
  • Star 26 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save sunzsh/fb8ae2624550dd58b088162e2957c8f2 to your computer and use it in GitHub Desktop.
Save sunzsh/fb8ae2624550dd58b088162e2957c8f2 to your computer and use it in GitHub Desktop.
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* @author sunzsh
* @param <T> 延迟业务数据的类型
*/
public abstract class ContinuousDelayBaseService<T> {
private static final DelayQueue<DelayBean> queue = new DelayQueue<>();
private static ExecutorService asyncDoneExecutor = Executors.newCachedThreadPool();
private static final Thread thread = new Thread(() -> {
Thread.currentThread().setName("ContinuousDelay-main");
while (true) {
try {
DelayBean delayBean = queue.take();
asyncDoneExecutor.submit(() -> {
Thread.currentThread().setName("ContinuousDelay-" + delayBean.id);
delayBean.done.accept(delayBean.data);
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
static {
thread.start();
}
/**
* 最后一次超时回调
* @param value
*/
protected abstract void done(T value);
/**
* @param delayMs 延迟毫秒数
* @param callback4InitOrIfExists 回调函数:初次调用时,参数为空;重复调用时,参数为上次调用的返回值
*/
public synchronized void delay(Long delayMs, Function<T, T> callback4InitOrIfExists) {
DelayBean delayBean = this.getDelayBean();
if (delayBean == null) {
T newValue = callback4InitOrIfExists != null ? callback4InitOrIfExists.apply(null) : null;
delayBean = new DelayBean(this.getClass().getName(), delayMs, newValue);
delayBean.done = (value) -> this.done((T) value);
} else {
deleteById(this.getClass().getName());
if (callback4InitOrIfExists != null) {
Object newValue = callback4InitOrIfExists.apply((T) delayBean.data);
delayBean.data = newValue;
}
delayBean.setDelay(delayMs);
}
queue.offer(delayBean);
}
/**
*获取延时队列数据
*/
private DelayBean getDelayBean() {
Iterator<DelayBean> iterator = queue.iterator();
while (iterator.hasNext()) {
DelayBean next = iterator.next();
if (Objects.equals(next.id, this.getClass().getName())) {
return next;
}
}
return null;
}
private boolean deleteById(String id) {
return queue.remove(new DelayBean(id));
}
protected static class DelayBean implements Delayed {
private String id;
long delay; //延迟时间
long expire; //到期时间
Object data; //数据
Consumer<Object> done;
protected void setDelay(long delay) {
expire = System.currentTimeMillis() + delay;
}
public DelayBean(String id) {
this.id = id;
}
public DelayBean(String id, long delay, Object data){
this.id = id;
this.delay = delay;
setDelay(delay); //到期时间 = 当前时间+延迟时间
this.data = data;
}
/**
* 需要实现的接口,获得延迟时间 用过期时间-当前时间
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public int hashCode() {
return this.getClass().hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == null)
return false;
if (!(obj instanceof DelayBean)) {
return false;
}
return Objects.equals(this.id, ((DelayBean)obj).id);
}
}
}
@yunjiangs
Copy link

您好朋友,我使用这个类做了一些测试,然而可能是我使用不当,this.getClass().getName()每次返回的值相同,所以对代码有些理解偏差,若有空还请解答疑惑,在此感谢。

正确的测试使用方式该如何调用呢?
我是for循环了一个object,然后第一次函数式调用赋值了null,后面的“重复调用”不是很理解。
”callback4InitOrIfExists 回调函数:初次调用时,参数为空;重复调用时,参数为上次调用的返回值"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment