Skip to content

Instantly share code, notes, and snippets.

@aclisp
Last active November 2, 2023 08:38
Show Gist options
  • Save aclisp/34a242f069489a50de9ddffe5c4f3113 to your computer and use it in GitHub Desktop.
Save aclisp/34a242f069489a50de9ddffe5c4f3113 to your computer and use it in GitHub Desktop.
优雅结束的定时任务
package job
import (
"fmt"
"git.yy.com/ihago/yalert/v2"
"git.yy.com/ihago/ylog"
"git.yy.com/ihago/ylog/zap"
"git.yy.com/ihago/yruntime"
"github.com/robfig/cron/v3"
"hago-room-srv-metrics2/util"
"time"
)
// Cron 调度任务
type Cron struct {
started bool
cron *cron.Cron
}
// Schedule 日程表
type Schedule struct {
// Spec 是调度周期,遵循规范 https://en.wikipedia.org/wiki/Cron,参考 https://crontab.guru/
Spec string
Cmd func()
}
// panicAlerter 抓住处理消息时的panic并告警
func (c *Cron) panicAlerter(j cron.Job) cron.Job {
return cron.FuncJob(func() {
defer func() {
if r := recover(); r != nil {
s, _ := yruntime.CallStack(1, 10) // skip defer func()
yalert.Alert(yalert.DefaultMsgFormat(fmt.Sprintf("cron job panic: %v \n statck: %s", r, s)), yalert.WeChat(wechatRestrainConf))
ylog.Error("cron job panic", zap.Any("error", r), zap.String("stack", s))
}
}()
j.Run()
})
}
// Start 启动调度任务,按日程表的设定来运行
func (c *Cron) Start(ss []Schedule) {
if c.started {
return
}
c.started = true
c.cron = cron.New(cron.WithChain(c.panicAlerter))
for _, s := range ss {
if _, err := c.cron.AddFunc(s.Spec, s.Cmd); err != nil {
util.AlertAndLog(fmt.Sprintf("cron.AddFunc %q: %v", s.Spec, err))
}
}
c.cron.Start()
}
// Stop 停止调度任务
func (c *Cron) Stop() {
if !c.started {
return
}
ctx := c.cron.Stop()
select {
case <-ctx.Done():
case <-time.After(time.Second):
ylog.Error("running cron jobs is not completed")
}
c.started = false
}
// Start .
func (m *RoomPlayerNum) Start() {
m.tick.Start(10*time.Second, dao.WithIsolation(m.everyMinute))
m.cron.Start([]job.Schedule{{Spec: "* * * * *", Cmd: dao.WithLeader(m.everyDay)}})
}
// Stop .
func (m *RoomPlayerNum) Stop() {
m.tick.Stop()
m.cron.Stop()
}
// 用于装饰定时任务的 f func()
package dao
import (
"fmt"
"git.yy.com/ihago/ycommon/elect"
"git.yy.com/ihago/ylog"
"git.yy.com/ihago/ylog/zap"
"hago-room-srv-metrics2/config"
"path/filepath"
"runtime"
"time"
)
// WithDistributedLock 基于redis分布式锁执行f,锁的名称为name,过期时间为d
func WithDistributedLock(name string, d time.Duration, f func()) func() {
return func() {
lockName := config.MyName + ":distributed_lock:" + name
ok, val, err := mRedis.Lock(lockName, d.Milliseconds())
if err != nil {
ylog.Error("redis lock fail", zap.String("lockName", lockName), zap.Int64("lockTime", d.Milliseconds()))
return
}
if !ok {
return
}
defer func() { // unlock even if f panics
if err := mRedis.Unlock(lockName, val); err != nil {
ylog.Error("redis unlock fail", zap.String("lockName", lockName), zap.Int64("lockTime", d.Milliseconds()))
}
}()
f()
}
}
// WithLeader 只在多个服务实例的领导者实例上执行f
func WithLeader(f func()) func() {
lockName := lockNameByFileLine()
ylog.Debug("leader lock name", zap.String("lockName", lockName))
f = WithDistributedLock(lockName, time.Minute, f)
return func() {
if elect.IsMaxNode(mService) {
f()
}
}
}
func lockNameByFileLine() string {
_, path, line, _ := runtime.Caller(2)
dir, file := filepath.Split(path)
dir = filepath.Base(dir)
lockName := fmt.Sprintf("%s:%s:%d", dir, file, line)
return lockName
}
// WithIsolation 多个服务实例,串行执行f
func WithIsolation(f func()) func() {
lockName := lockNameByFileLine()
ylog.Debug("isolation lock name", zap.String("lockName", lockName))
return WithDistributedLock(lockName, time.Minute, f)
}
// Stop 停止每个指标计算模块
func Stop() {
// 停止会阻塞等待定时任务结束,这里必须用并行停止,节省时间
g := new(errgroup.Group)
for _, m := range metrics {
m := m // https://golang.org/doc/faq#closures_and_goroutines
g.Go(func() error {
m.Stop()
return nil
})
}
g.Wait()
}
package job
import (
"context"
"fmt"
"git.yy.com/ihago/yalert/v2"
"git.yy.com/ihago/ylog"
"git.yy.com/ihago/ylog/zap"
"git.yy.com/ihago/yruntime"
"time"
)
// Tick 定时任务
type Tick struct {
started bool
ctx context.Context
cancel context.CancelFunc
}
// panicAlerter 抓住处理消息时的panic并告警
func (t *Tick) panicAlerter(f func()) func() {
return func() {
defer func() {
if r := recover(); r != nil {
s, _ := yruntime.CallStack(1, 10) // skip defer func()
yalert.Alert(yalert.DefaultMsgFormat(fmt.Sprintf("tick job panic: %v \n statck: %s", r, s)), yalert.WeChat(wechatRestrainConf))
ylog.Error("tick job panic", zap.Any("error", r), zap.String("stack", s))
}
}()
f()
}
}
// Start 启动定时任务,以周期d运行f
func (t *Tick) Start(d time.Duration, f func()) {
if t.started {
return
}
t.started = true
var ctx context.Context
ctx, t.cancel = context.WithCancel(context.Background())
var cancel context.CancelFunc
t.ctx, cancel = context.WithCancel(context.Background())
go func() {
ticker := time.NewTicker(d)
defer ticker.Stop()
Loop:
for {
select {
case <-ticker.C:
t.panicAlerter(f)()
case <-ctx.Done():
break Loop
}
if ctx.Err() != nil {
break
}
}
cancel()
}()
}
// Stop 停止定时任务
func (t *Tick) Stop() {
if !t.started {
return
}
t.cancel()
select {
case <-t.ctx.Done():
case <-time.After(time.Second):
ylog.Error("running tick jobs is not completed")
}
t.started = false
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment