Skip to content

Instantly share code, notes, and snippets.

@JunqiangYang
Last active July 10, 2017 08:24
Show Gist options
  • Save JunqiangYang/c10c02ee2b4be859370bd056fdda117a to your computer and use it in GitHub Desktop.
Save JunqiangYang/c10c02ee2b4be859370bd056fdda117a to your computer and use it in GitHub Desktop.
timer 定时器本地实现
import java.util.Calendar;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* @author Junqiang.Yang
* @create 2017-06-30 16:07
**/
public class Demo {
private Executor executor;
private TimerWheel wheel;
private boolean isInit = false;
private volatile static AtomicBoolean boot_start = new AtomicBoolean(false); // boot未启动
private volatile static AtomicBoolean repayment_start = new AtomicBoolean(false); // 还款未启动
private void addMidDetailTask() {
wheel.submit(new TimedTask() {
@Override
public boolean canPassWhenCollision() {
return false;
}
@Override
public void doWhenCollision() {
BizLogger.info("addMidDetailTask Task Collision");
}
@Override
public void run() {
BizLogger.info("begin addMidDetailTask");
Calendar now = Calendar.getInstance();
/**
* 任务
*/
BizLogger.info("end addMidDetailTask");
}
}, TimerWheel.getDelayToNextTime(1, 50, 0), true, 86400, -1);
}
public void init() {
BizLogger.info(" role is " + AppConfig.get("app.role", ""));
addMidDetailTask() ;
}
}
public class TimedTask implements Runnable {
public TimedTask() {}
/**
* 当冲突(上一次调度未执行完,本次再次调度)时,返回是否继续执行
* @return
*/
public boolean canPassWhenCollision() {
return true;
}
/**
* 当冲突时候执行的功能
*/
public void doWhenCollision() {
}
/**
* 正常情况执行的功能
*/
public void run() {
}
}
import java.util.concurrent.atomic.AtomicInteger;
class TimerRecord implements Runnable {
private final static AtomicInteger counter = new AtomicInteger(0);
//唯一id
private int id = counter.getAndAdd(1);
public static final int DAY_OF_MONTH = 0;
public static final int HOUR_OF_DAY = 1;
public static final int MINUTE = 2;
public static final int SECOND = 3;
public static final int[] UNITS = {DAY_OF_MONTH, HOUR_OF_DAY, MINUTE, SECOND};
public static final int[] UNIT_MAX_VALUES = {32, 24, 60, 60};
public static final int MAX_DELAY = 31*24*60*60;
private int[] unitValues = new int[UNITS.length];
//是否是周期任务
private boolean isPeriod;
private int delay;
//调度周期
private int period;
//剩余调用次数
private int leftInvokes;
private TimedTask task;
private volatile boolean isRunning = false;
public TimerRecord(TimedTask task, int delay, int period) {
this(task, delay, true, period, -1);
}
public TimerRecord(TimedTask task, int delay) {
this(task, delay, false, 0, 1);
}
/**
* 定时调度器构造
* @param id 唯一ID
* @param delay 首次执行延迟时间, 单位为秒
* @param isPeriod 是否是周期任务
* @param period 调度周期,单位为秒
* @param leftInvokes 剩余调度次数,负数表示无限大
*/
public TimerRecord(TimedTask task, int delay, boolean isPeriod, int period, int leftInvokes) {
init(task, delay, isPeriod, period, leftInvokes);
}
private void init(TimedTask task, int delay, boolean isPeriod, int period, int leftInvokes) {
this.delay = delay;
this.isPeriod = isPeriod;
this.period = period;
this.leftInvokes = leftInvokes;
this.task = task;
}
void initDelayUnit(int addDelay) {
this.delay+=addDelay;
for(int i=unitValues.length-1;i>=0;i--) {
unitValues[i] = delay%UNIT_MAX_VALUES[i];
delay/=UNIT_MAX_VALUES[i];
}
}
public final int getID() {
return id;
}
public int hashCode() {
return id;
}
public boolean equals(Object obj) {
if(obj == null) {
return false;
}
else if(obj instanceof TimerRecord) {
return ((TimerRecord)obj).getID() == id;
}
return false;
}
public int getPeriod() {
return period;
}
/**
* 设置调度周期,下次调度生效
* @param period
*/
public void setPeriod(int period) {
this.period = period;
}
int getMaxFieldIndex() {
int tmp = delay;
if(tmp == 0) {
return unitValues.length;
}
for(int i=UNIT_MAX_VALUES.length-1;i>=0;i--) {
tmp/=UNIT_MAX_VALUES[i];
if(tmp == 0) {
return i;
}
}
return unitValues.length;
}
public int getField(int unit) {
return unitValues[unit];
}
public boolean isPeriod() {
return isPeriod;
}
final void whenInvoked() {
int left = leftInvokes > 0?(leftInvokes-1):-1;
if(isPeriod && leftInvokes !=0) {
init(task, period, isPeriod, period, left);
}
}
public final boolean canInvokeAgain() {
return isPeriod?leftInvokes!=0:false;
}
@Override
public void run() {
if(this.isRunning) {
if(task.canPassWhenCollision()) {
task.run();
this.isRunning = false;
}
else {
task.doWhenCollision();
}
}
else {
this.isRunning = true;
try {
task.run();
}
catch(Throwable th) {
th.printStackTrace();
}
finally {
this.isRunning = false;
}
}
}
}
import java.util.Calendar;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
/**
* 时间轮调度器,实现秒级调度,最大支持1个月内的时间调度
* @author easezhang
*
*/
public class TimerWheel extends Thread {
private class Wheel {
ConcurrentLinkedQueue<TimerRecord>[] slots;
final int unit;
volatile int pointer = 0;
@SuppressWarnings("unchecked")
public Wheel(int size, int unit) {
this.unit = unit;
slots = new ConcurrentLinkedQueue[size];
for(int i=0;i<size;i++) {
slots[i] = new ConcurrentLinkedQueue<TimerRecord>();
}
}
private int hash(TimerRecord record) {
return (record.getField(unit)+pointer)%slots.length;
}
public void push(TimerRecord record) {
int index = hash(record);
slots[index].offer(record);
//监控用
// StringBuffer sb = new StringBuffer();
// for(int i=0;i<wheels.length;i++) {
// sb.append(wheels[i].pointer+"|");
// }
// System.out.println(new java.util.Date()+": push task to unit="+unit+" pointer="+pointer+" index="+index +"||"+sb.toString());
}
public TimerRecord poll() {
return slots[pointer].poll();
}
public boolean tick() {
int tmp = pointer;
pointer = (pointer+1)%slots.length;
return tmp > pointer;
}
}
private Wheel[] wheels;
private Executor executor;
private ConcurrentLinkedQueue<TimerRecord> toDo = new ConcurrentLinkedQueue<TimerRecord>();
private static final int SLEEP_TIME = 1000;
private boolean isRunning = true;
/**
* 线程池构造,默认采用cache线程池,没有上限大小
* @param name 时间轮线程名称,用于stack跟踪
*/
public TimerWheel(String name) {
this(name, Executors.newCachedThreadPool());
}
/**
* 调度器构造,支持一个月内的秒级调度
* @param name 线程名称,用于stack跟踪
* @param executor 任务执行器
*/
public TimerWheel(String name, Executor executor) {
setName(name);
wheels = new Wheel[TimerRecord.UNITS.length];
for(int i=0;i<wheels.length;i++) {
wheels[i] = new Wheel(TimerRecord.UNIT_MAX_VALUES[i], TimerRecord.UNITS[i]);
}
this.executor = executor;
start();
}
/**
* 提交定时任务
* @param task 任务
* @param delay 首次执行延迟时间,单位是秒
* @param isPeriod 是否为周期任务
* @param period 执行周期,单位是秒
* @param invokeTimes 除首次之外,还要周期执行多少次,负数表示无限次
*/
public void submit(TimedTask task, int delay, boolean isPeriod, int period, int invokeTimes ) {
if(!isRunning) {
throw new RuntimeException("can only submit task when wheel is running");
}
if(delay>TimerRecord.MAX_DELAY || delay<0) {
throw new RuntimeException("delay should be [0,"+TimerRecord.MAX_DELAY+"]");
}
if(isPeriod && (period<=0||period>TimerRecord.MAX_DELAY)) {
throw new RuntimeException("period should be (0,"+TimerRecord.MAX_DELAY+"]");
}
TimerRecord record = new TimerRecord(task, delay, isPeriod, period, invokeTimes);
toDo.offer(record);
}
/**
* 获取到下个时间点的时间间隔
* @param hour 小时 按24小时制,取值0~23
* @param min 分钟0-59
* @param sec 秒 0-59
* @return 时间间隔
*/
public static int getDelayToNextTime(int hour, int min, int sec) {
if(hour<0||hour>24||min<0||min>59||sec<0||sec>59) {
throw new RuntimeException("param error");
}
Calendar now = Calendar.getInstance();
int time = hour*60*60+min*60+sec;
int nowTime = now.get(Calendar.HOUR_OF_DAY)*60*60+now.get(Calendar.MINUTE)*60+now.get(Calendar.SECOND);
Calendar tmp= Calendar.getInstance();
tmp.set(Calendar.HOUR_OF_DAY, hour);
tmp.set(Calendar.MINUTE, min);
tmp.set(Calendar.SECOND, sec);
if(time < nowTime) {
tmp.add(Calendar.DAY_OF_MONTH, 1);
}
return (int)((tmp.getTimeInMillis() - now.getTimeInMillis())/1000);
}
/**
* 提交定时任务
* @param task 任务
* @param delay 首次执行延迟时间,单位是秒
* @param isPeriod 是否为周期任务
* @param period 执行周期,单位是秒
* @param invokeTimes 除首次之外,还要周期执行多少次,负数表示无限次
*/
public void submit(final Runnable task, int delay, boolean isPeriod, int period, int invokeTimes) {
TimedTask timeTask = new TimedTask() {
public void run() {task.run();}
};
submit(timeTask, delay, isPeriod, period, invokeTimes);
}
private int getWheelDelay(int index) {
int delay = 0;
int unit = 1;
for(int i=wheels.length-1;i>index;i--) {
delay +=wheels[i].pointer * unit;
unit*=TimerRecord.UNIT_MAX_VALUES[i];
}
return delay;
}
private void start(TimerRecord record) {
record.initDelayUnit(getWheelDelay(record.getMaxFieldIndex()));
for(int i=0;i<wheels.length;i++) {
if(record.getField(i)!=0) {
wheels[i].push(record);
return;
}
}
wheels[wheels.length-1].push(record);
}
private void executeRecord(TimerRecord record) {
executor.execute(record);
record.whenInvoked();
if(record.canInvokeAgain()) {
start(record);
}
}
private void tick() {
TimerRecord record = null;
int index=0;
while(index<=wheels.length-1) {
while((record=wheels[index].poll())!=null) {
if(index<wheels.length-1) {
wheels[index+1].push(record);
}
else {
executeRecord(record);
}
}
++index;
}
while((record = toDo.poll())!=null) {
start(record);
}
while((record=wheels[wheels.length-1].poll())!=null) {
executeRecord(record);
}
for(int i=wheels.length-1;i>=0;i--) {
if(!wheels[i].tick()) {
break;
}
}
}
public void run() {
long now = 0L;
final long start = System.currentTimeMillis();
long end = start;
while(isRunning) {
try {
end += SLEEP_TIME;
tick();
now = System.currentTimeMillis();
if(now<end) {
Thread.sleep(end-now);
}
}
catch(InterruptedException e) {
}
catch(Throwable th) {
th.printStackTrace();
}
}
}
public final void interrupt() {
//屏蔽此函数
}
/**
* 关闭定时调度器,已经执行中的任务将继续执行
*/
public void shutdown() {
isRunning = false;
}
public static void main(String ...args) {
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() - 1);
TimerWheel wheel = new TimerWheel("NYPeriodTask", executor);
wheel.submit(new TimedTask() {
@Override
public boolean canPassWhenCollision() {
return false;
}
@Override
public void doWhenCollision() {
System.out.println("collision");
}
@Override
public void run() {
try {
System.out.println("running");
Thread.sleep(3000);
System.out.println("end");
}
catch(Throwable th) {}
}
}, 1, true, 1, -1);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment