Last active
July 10, 2017 08:24
-
-
Save JunqiangYang/c10c02ee2b4be859370bd056fdda117a to your computer and use it in GitHub Desktop.
timer 定时器本地实现
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Calendar; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
/** | |
* @author Junqiang.Yang | |
* @create 2017-06-30 16:07 | |
**/ | |
public class Demo { | |
private Executor executor; | |
private TimerWheel wheel; | |
private boolean isInit = false; | |
private volatile static AtomicBoolean boot_start = new AtomicBoolean(false); // boot未启动 | |
private volatile static AtomicBoolean repayment_start = new AtomicBoolean(false); // 还款未启动 | |
private void addMidDetailTask() { | |
wheel.submit(new TimedTask() { | |
@Override | |
public boolean canPassWhenCollision() { | |
return false; | |
} | |
@Override | |
public void doWhenCollision() { | |
BizLogger.info("addMidDetailTask Task Collision"); | |
} | |
@Override | |
public void run() { | |
BizLogger.info("begin addMidDetailTask"); | |
Calendar now = Calendar.getInstance(); | |
/** | |
* 任务 | |
*/ | |
BizLogger.info("end addMidDetailTask"); | |
} | |
}, TimerWheel.getDelayToNextTime(1, 50, 0), true, 86400, -1); | |
} | |
public void init() { | |
BizLogger.info(" role is " + AppConfig.get("app.role", "")); | |
addMidDetailTask() ; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TimedTask implements Runnable { | |
public TimedTask() {} | |
/** | |
* 当冲突(上一次调度未执行完,本次再次调度)时,返回是否继续执行 | |
* @return | |
*/ | |
public boolean canPassWhenCollision() { | |
return true; | |
} | |
/** | |
* 当冲突时候执行的功能 | |
*/ | |
public void doWhenCollision() { | |
} | |
/** | |
* 正常情况执行的功能 | |
*/ | |
public void run() { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicInteger; | |
class TimerRecord implements Runnable { | |
private final static AtomicInteger counter = new AtomicInteger(0); | |
//唯一id | |
private int id = counter.getAndAdd(1); | |
public static final int DAY_OF_MONTH = 0; | |
public static final int HOUR_OF_DAY = 1; | |
public static final int MINUTE = 2; | |
public static final int SECOND = 3; | |
public static final int[] UNITS = {DAY_OF_MONTH, HOUR_OF_DAY, MINUTE, SECOND}; | |
public static final int[] UNIT_MAX_VALUES = {32, 24, 60, 60}; | |
public static final int MAX_DELAY = 31*24*60*60; | |
private int[] unitValues = new int[UNITS.length]; | |
//是否是周期任务 | |
private boolean isPeriod; | |
private int delay; | |
//调度周期 | |
private int period; | |
//剩余调用次数 | |
private int leftInvokes; | |
private TimedTask task; | |
private volatile boolean isRunning = false; | |
public TimerRecord(TimedTask task, int delay, int period) { | |
this(task, delay, true, period, -1); | |
} | |
public TimerRecord(TimedTask task, int delay) { | |
this(task, delay, false, 0, 1); | |
} | |
/** | |
* 定时调度器构造 | |
* @param id 唯一ID | |
* @param delay 首次执行延迟时间, 单位为秒 | |
* @param isPeriod 是否是周期任务 | |
* @param period 调度周期,单位为秒 | |
* @param leftInvokes 剩余调度次数,负数表示无限大 | |
*/ | |
public TimerRecord(TimedTask task, int delay, boolean isPeriod, int period, int leftInvokes) { | |
init(task, delay, isPeriod, period, leftInvokes); | |
} | |
private void init(TimedTask task, int delay, boolean isPeriod, int period, int leftInvokes) { | |
this.delay = delay; | |
this.isPeriod = isPeriod; | |
this.period = period; | |
this.leftInvokes = leftInvokes; | |
this.task = task; | |
} | |
void initDelayUnit(int addDelay) { | |
this.delay+=addDelay; | |
for(int i=unitValues.length-1;i>=0;i--) { | |
unitValues[i] = delay%UNIT_MAX_VALUES[i]; | |
delay/=UNIT_MAX_VALUES[i]; | |
} | |
} | |
public final int getID() { | |
return id; | |
} | |
public int hashCode() { | |
return id; | |
} | |
public boolean equals(Object obj) { | |
if(obj == null) { | |
return false; | |
} | |
else if(obj instanceof TimerRecord) { | |
return ((TimerRecord)obj).getID() == id; | |
} | |
return false; | |
} | |
public int getPeriod() { | |
return period; | |
} | |
/** | |
* 设置调度周期,下次调度生效 | |
* @param period | |
*/ | |
public void setPeriod(int period) { | |
this.period = period; | |
} | |
int getMaxFieldIndex() { | |
int tmp = delay; | |
if(tmp == 0) { | |
return unitValues.length; | |
} | |
for(int i=UNIT_MAX_VALUES.length-1;i>=0;i--) { | |
tmp/=UNIT_MAX_VALUES[i]; | |
if(tmp == 0) { | |
return i; | |
} | |
} | |
return unitValues.length; | |
} | |
public int getField(int unit) { | |
return unitValues[unit]; | |
} | |
public boolean isPeriod() { | |
return isPeriod; | |
} | |
final void whenInvoked() { | |
int left = leftInvokes > 0?(leftInvokes-1):-1; | |
if(isPeriod && leftInvokes !=0) { | |
init(task, period, isPeriod, period, left); | |
} | |
} | |
public final boolean canInvokeAgain() { | |
return isPeriod?leftInvokes!=0:false; | |
} | |
@Override | |
public void run() { | |
if(this.isRunning) { | |
if(task.canPassWhenCollision()) { | |
task.run(); | |
this.isRunning = false; | |
} | |
else { | |
task.doWhenCollision(); | |
} | |
} | |
else { | |
this.isRunning = true; | |
try { | |
task.run(); | |
} | |
catch(Throwable th) { | |
th.printStackTrace(); | |
} | |
finally { | |
this.isRunning = false; | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Calendar; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.Executors; | |
/** | |
* 时间轮调度器,实现秒级调度,最大支持1个月内的时间调度 | |
* @author easezhang | |
* | |
*/ | |
public class TimerWheel extends Thread { | |
private class Wheel { | |
ConcurrentLinkedQueue<TimerRecord>[] slots; | |
final int unit; | |
volatile int pointer = 0; | |
@SuppressWarnings("unchecked") | |
public Wheel(int size, int unit) { | |
this.unit = unit; | |
slots = new ConcurrentLinkedQueue[size]; | |
for(int i=0;i<size;i++) { | |
slots[i] = new ConcurrentLinkedQueue<TimerRecord>(); | |
} | |
} | |
private int hash(TimerRecord record) { | |
return (record.getField(unit)+pointer)%slots.length; | |
} | |
public void push(TimerRecord record) { | |
int index = hash(record); | |
slots[index].offer(record); | |
//监控用 | |
// StringBuffer sb = new StringBuffer(); | |
// for(int i=0;i<wheels.length;i++) { | |
// sb.append(wheels[i].pointer+"|"); | |
// } | |
// System.out.println(new java.util.Date()+": push task to unit="+unit+" pointer="+pointer+" index="+index +"||"+sb.toString()); | |
} | |
public TimerRecord poll() { | |
return slots[pointer].poll(); | |
} | |
public boolean tick() { | |
int tmp = pointer; | |
pointer = (pointer+1)%slots.length; | |
return tmp > pointer; | |
} | |
} | |
private Wheel[] wheels; | |
private Executor executor; | |
private ConcurrentLinkedQueue<TimerRecord> toDo = new ConcurrentLinkedQueue<TimerRecord>(); | |
private static final int SLEEP_TIME = 1000; | |
private boolean isRunning = true; | |
/** | |
* 线程池构造,默认采用cache线程池,没有上限大小 | |
* @param name 时间轮线程名称,用于stack跟踪 | |
*/ | |
public TimerWheel(String name) { | |
this(name, Executors.newCachedThreadPool()); | |
} | |
/** | |
* 调度器构造,支持一个月内的秒级调度 | |
* @param name 线程名称,用于stack跟踪 | |
* @param executor 任务执行器 | |
*/ | |
public TimerWheel(String name, Executor executor) { | |
setName(name); | |
wheels = new Wheel[TimerRecord.UNITS.length]; | |
for(int i=0;i<wheels.length;i++) { | |
wheels[i] = new Wheel(TimerRecord.UNIT_MAX_VALUES[i], TimerRecord.UNITS[i]); | |
} | |
this.executor = executor; | |
start(); | |
} | |
/** | |
* 提交定时任务 | |
* @param task 任务 | |
* @param delay 首次执行延迟时间,单位是秒 | |
* @param isPeriod 是否为周期任务 | |
* @param period 执行周期,单位是秒 | |
* @param invokeTimes 除首次之外,还要周期执行多少次,负数表示无限次 | |
*/ | |
public void submit(TimedTask task, int delay, boolean isPeriod, int period, int invokeTimes ) { | |
if(!isRunning) { | |
throw new RuntimeException("can only submit task when wheel is running"); | |
} | |
if(delay>TimerRecord.MAX_DELAY || delay<0) { | |
throw new RuntimeException("delay should be [0,"+TimerRecord.MAX_DELAY+"]"); | |
} | |
if(isPeriod && (period<=0||period>TimerRecord.MAX_DELAY)) { | |
throw new RuntimeException("period should be (0,"+TimerRecord.MAX_DELAY+"]"); | |
} | |
TimerRecord record = new TimerRecord(task, delay, isPeriod, period, invokeTimes); | |
toDo.offer(record); | |
} | |
/** | |
* 获取到下个时间点的时间间隔 | |
* @param hour 小时 按24小时制,取值0~23 | |
* @param min 分钟0-59 | |
* @param sec 秒 0-59 | |
* @return 时间间隔 | |
*/ | |
public static int getDelayToNextTime(int hour, int min, int sec) { | |
if(hour<0||hour>24||min<0||min>59||sec<0||sec>59) { | |
throw new RuntimeException("param error"); | |
} | |
Calendar now = Calendar.getInstance(); | |
int time = hour*60*60+min*60+sec; | |
int nowTime = now.get(Calendar.HOUR_OF_DAY)*60*60+now.get(Calendar.MINUTE)*60+now.get(Calendar.SECOND); | |
Calendar tmp= Calendar.getInstance(); | |
tmp.set(Calendar.HOUR_OF_DAY, hour); | |
tmp.set(Calendar.MINUTE, min); | |
tmp.set(Calendar.SECOND, sec); | |
if(time < nowTime) { | |
tmp.add(Calendar.DAY_OF_MONTH, 1); | |
} | |
return (int)((tmp.getTimeInMillis() - now.getTimeInMillis())/1000); | |
} | |
/** | |
* 提交定时任务 | |
* @param task 任务 | |
* @param delay 首次执行延迟时间,单位是秒 | |
* @param isPeriod 是否为周期任务 | |
* @param period 执行周期,单位是秒 | |
* @param invokeTimes 除首次之外,还要周期执行多少次,负数表示无限次 | |
*/ | |
public void submit(final Runnable task, int delay, boolean isPeriod, int period, int invokeTimes) { | |
TimedTask timeTask = new TimedTask() { | |
public void run() {task.run();} | |
}; | |
submit(timeTask, delay, isPeriod, period, invokeTimes); | |
} | |
private int getWheelDelay(int index) { | |
int delay = 0; | |
int unit = 1; | |
for(int i=wheels.length-1;i>index;i--) { | |
delay +=wheels[i].pointer * unit; | |
unit*=TimerRecord.UNIT_MAX_VALUES[i]; | |
} | |
return delay; | |
} | |
private void start(TimerRecord record) { | |
record.initDelayUnit(getWheelDelay(record.getMaxFieldIndex())); | |
for(int i=0;i<wheels.length;i++) { | |
if(record.getField(i)!=0) { | |
wheels[i].push(record); | |
return; | |
} | |
} | |
wheels[wheels.length-1].push(record); | |
} | |
private void executeRecord(TimerRecord record) { | |
executor.execute(record); | |
record.whenInvoked(); | |
if(record.canInvokeAgain()) { | |
start(record); | |
} | |
} | |
private void tick() { | |
TimerRecord record = null; | |
int index=0; | |
while(index<=wheels.length-1) { | |
while((record=wheels[index].poll())!=null) { | |
if(index<wheels.length-1) { | |
wheels[index+1].push(record); | |
} | |
else { | |
executeRecord(record); | |
} | |
} | |
++index; | |
} | |
while((record = toDo.poll())!=null) { | |
start(record); | |
} | |
while((record=wheels[wheels.length-1].poll())!=null) { | |
executeRecord(record); | |
} | |
for(int i=wheels.length-1;i>=0;i--) { | |
if(!wheels[i].tick()) { | |
break; | |
} | |
} | |
} | |
public void run() { | |
long now = 0L; | |
final long start = System.currentTimeMillis(); | |
long end = start; | |
while(isRunning) { | |
try { | |
end += SLEEP_TIME; | |
tick(); | |
now = System.currentTimeMillis(); | |
if(now<end) { | |
Thread.sleep(end-now); | |
} | |
} | |
catch(InterruptedException e) { | |
} | |
catch(Throwable th) { | |
th.printStackTrace(); | |
} | |
} | |
} | |
public final void interrupt() { | |
//屏蔽此函数 | |
} | |
/** | |
* 关闭定时调度器,已经执行中的任务将继续执行 | |
*/ | |
public void shutdown() { | |
isRunning = false; | |
} | |
public static void main(String ...args) { | |
Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() - 1); | |
TimerWheel wheel = new TimerWheel("NYPeriodTask", executor); | |
wheel.submit(new TimedTask() { | |
@Override | |
public boolean canPassWhenCollision() { | |
return false; | |
} | |
@Override | |
public void doWhenCollision() { | |
System.out.println("collision"); | |
} | |
@Override | |
public void run() { | |
try { | |
System.out.println("running"); | |
Thread.sleep(3000); | |
System.out.println("end"); | |
} | |
catch(Throwable th) {} | |
} | |
}, 1, true, 1, -1); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment