Skip to content

Instantly share code, notes, and snippets.

@Blquinn
Last active April 29, 2024 22:53
Show Gist options
  • Save Blquinn/119df3a4decb70cdd9b1175df6e6ff72 to your computer and use it in GitHub Desktop.
Save Blquinn/119df3a4decb70cdd9b1175df6e6ff72 to your computer and use it in GitHub Desktop.
Global scheduler lock implementation using redis, daily scheduled task using redis + db
package main
import (
"github.com/go-redis/redis"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
"github.com/robfig/cron"
"github.com/sirupsen/logrus"
"time"
)
func init() {
logrus.SetFormatter(&logrus.TextFormatter{})
logrus.SetLevel(logrus.DebugLevel)
}
func main() {
db := sqlx.MustConnect("postgres", "host=localhost port=5432 user=ben password=password dbname=globalcron sslmode=disable")
_, err := db.Exec(` delete from task_log; `)
if err != nil {
panic(err)
}
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // no password set
DB: 0, // use default DB
})
c := cron.New()
err = c.AddJob("*/5 * * * * *", newAtomicCronTask(
"print",
client,
func() {
logrus.Infof("Running job: the time is %s", time.Now().Format(time.RFC3339))
},
2 * time.Second))
err = c.AddJob("*/5 * * * * *", newAtomicCronTask(
"daily",
client,
onceDailyCron(db, "daily", func() {
logrus.Println("Running once daily cron")
}),
2 * time.Second))
if err != nil {
panic(err)
}
c.Run()
}
// AtomicCronTask is a cron task that uses a schedule lock to
// ensure that it only runs 1 time across the entire cluster.
type AtomicCronTask struct {
redis *redis.Client
name string
lockDur time.Duration
fn func()
}
func newAtomicCronTask(name string, r *redis.Client, fun func(), lockDur time.Duration) *AtomicCronTask {
return &AtomicCronTask{
redis: r,
name: "atomic_job:" + name,
fn: fun,
lockDur: lockDur,
}
}
func (s *AtomicCronTask) Run() {
err := s.redis.Watch(func(tx *redis.Tx) error {
res := tx.Get(s.name)
_, err := res.Result()
if err == redis.Nil {
pipe := tx.TxPipeline()
pipe.Set(s.name, time.Now().Format(time.RFC3339Nano), s.lockDur)
_, err := pipe.Exec()
if err != nil {
return err
}
logrus.Debugf("set key running task %s", s.name)
s.fn()
return nil
} else if err != nil {
logrus.Errorf("got err while getting key from redis: %v", err)
return nil
} else {
logrus.Debugf("key exists, skipping task %s", s.name)
}
return nil
}, s.name)
if err != nil {
if err == redis.TxFailedErr {
logrus.Debugln("race condition avoided via transaction")
} else {
logrus.Errorf("got error during redis transaction: %v", err)
}
}
}
var _ cron.Job = &AtomicCronTask{}
func onceDailyCron(db *sqlx.DB, name string, fn func()) func() {
return func() {
var n int
err := db.Get(&n, `
select count(*)
from task_log
where task_name = $1
and run_time > $2
`, name, time.Now().Truncate(24*time.Hour))
if err != nil {
logrus.Errorf("Got err while getting task log entry: %v", err)
return
}
if n < 1 {
_, execErr := db.Exec(`
insert into task_log (task_name, run_time)
values ($1, $2);
`, name, time.Now())
if execErr != nil {
logrus.Errorln("Failed to insert daily cron entry: %v", execErr)
return
}
fn()
return
}
logrus.Debugln("Skipping daily task due to existing entry")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment