Skip to content

Instantly share code, notes, and snippets.

@jasoet
Last active June 20, 2024 02:37
Show Gist options
  • Save jasoet/39bd884e629deb4790ba7503f738c26e to your computer and use it in GitHub Desktop.
Save jasoet/39bd884e629deb4790ba7503f738c26e to your computer and use it in GitHub Desktop.
package job
import (
"fmt"
"github.com/go-co-op/gocron"
"reflect"
"strings"
"sync"
"time"
)
type Cron struct {
ID string
Expression string
Task any
Params []any
}
func WithCron(cron Cron) Option {
withError := func(err error) Option {
return func(sc *gocron.Scheduler) (string, *gocron.Job, error) {
return "", nil, err
}
}
if strings.TrimSpace(cron.ID) == "" {
err := fmt.Errorf("id cannot be blank")
return withError(err)
}
err := validateTask(cron.Task, cron.Params)
if err != nil {
return withError(err)
}
return func(sc *gocron.Scheduler) (string, *gocron.Job, error) {
job, err := sc.Cron(cron.Expression).SingletonMode().Do(cron.Task, cron.Params...)
if err != nil {
return "", nil, err
}
return cron.ID, job, err
}
}
type Interval struct {
ID string
Every time.Duration
Task any
Params []any
}
func WithInterval(schedule Interval) Option {
withError := func(err error) Option {
return func(sc *gocron.Scheduler) (string, *gocron.Job, error) {
return "", nil, err
}
}
if strings.TrimSpace(schedule.ID) == "" {
err := fmt.Errorf("id cannot be blank")
return withError(err)
}
err := validateTask(schedule.Task, schedule.Params)
if err != nil {
return withError(err)
}
if schedule.Every < 2*time.Second {
err := fmt.Errorf("duration cannot be less than 2s")
return withError(err)
}
return func(sc *gocron.Scheduler) (string, *gocron.Job, error) {
job, err := sc.Every(schedule.Every).SingletonMode().Do(schedule.Task, schedule.Params...)
if err != nil {
return "", nil, err
}
return schedule.ID, job, err
}
}
type Scheduler struct {
*gocron.Scheduler
mu *sync.Mutex
jobs map[string]*gocron.Job
}
type Option func(sc *gocron.Scheduler) (string, *gocron.Job, error)
func NewScheduler(options ...Option) (*Scheduler, error) {
jobs := make(map[string]*gocron.Job)
scheduler := gocron.NewScheduler(time.UTC)
mutex := &sync.Mutex{}
mutex.Lock()
defer mutex.Unlock()
for _, option := range options {
id, job, err := option(scheduler)
if err != nil {
return nil, err
}
jobs[id] = job
}
return &Scheduler{
Scheduler: scheduler,
mu: mutex,
jobs: jobs,
}, nil
}
func (cs *Scheduler) AddJob(options ...Option) error {
cs.mu.Lock()
defer cs.mu.Unlock()
for _, option := range options {
id, job, err := option(cs.Scheduler)
if err != nil {
return err
}
cs.jobs[id] = job
}
return nil
}
func (cs *Scheduler) EmptyJobs() {
cs.mu.Lock()
defer cs.mu.Unlock()
for id, job := range cs.jobs {
cs.Scheduler.RemoveByReference(job)
delete(cs.jobs, id)
}
}
func (cs *Scheduler) RemoveJob(jobID string) {
cs.mu.Lock()
defer cs.mu.Unlock()
job, exists := cs.jobs[jobID]
if exists {
cs.Scheduler.RemoveByReference(job)
delete(cs.jobs, jobID)
}
}
func validateTask(task any, params []any) error {
taskReflect := reflect.ValueOf(task)
if task == nil {
err := fmt.Errorf("task cannot be nil")
return err
}
kind := taskReflect.Kind()
if kind != reflect.Func {
err := fmt.Errorf("task is not a function")
return err
}
taskType := taskReflect.Type()
if len(params) != taskType.NumIn() {
err := fmt.Errorf("number of parameters does not match the function, actual: %d, expected: %d", len(params), taskType.NumIn())
return err
}
for i, param := range params {
if param == nil {
err := fmt.Errorf("nil parameter provided for function argument %d", i)
return err
}
paramType := reflect.ValueOf(param).Type()
if !paramType.AssignableTo(taskType.In(i)) {
err := fmt.Errorf("parameter #%d type %s does not match function's expected type %s", i, paramType.String(), taskType.In(i).String())
return err
}
}
return nil
}
package job
import (
"github.com/go-co-op/gocron"
"testing"
"time"
)
func TestScheduler(t *testing.T) {
testFunc := func() {}
testJob := Interval{ID: "1", Every: 2 * time.Second, Task: testFunc, Params: []interface{}{}}
s, err := NewScheduler(WithInterval(testJob))
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
if _, exists := s.jobs["1"]; !exists {
t.Fatal("Expected job to be added")
}
s.RemoveJob("1")
if _, exists := s.jobs["1"]; exists {
t.Fatal("Expected job to be removed")
}
s.RemoveJob("1") // Test removing nonexistent job (should not panic)
}
func TestWithInterval(t *testing.T) {
validTask := func(args int) {}
invalidTask := "not a func"
emptyID := Interval{ID: "", Task: validTask, Every: 3 * time.Second, Params: []any{1}}
emptyTask := Interval{ID: "id1", Task: nil, Every: 3 * time.Second}
invalidTaskType := Interval{ID: "id2", Task: invalidTask, Every: 3 * time.Second}
invalidEvery := Interval{ID: "id3", Task: validTask, Every: 1 * time.Second, Params: []any{1}}
mismatchedParams := Interval{ID: "id4", Task: validTask, Every: 3 * time.Second, Params: []any{1, 2}}
validSchedule := Interval{ID: "id5", Task: validTask, Every: 3 * time.Second, Params: []any{1}}
invalidParamType := Interval{ID: "id5", Task: validTask, Every: 3 * time.Second, Params: []any{"1"}}
tests := []struct {
name string
params Interval
wantErr bool
}{
{"validParams", validSchedule, false},
{"emptyID", emptyID, true},
{"emptyTask", emptyTask, true},
{"invalidTaskType", invalidTaskType, true},
{"invalidParamType", invalidParamType, true},
{"invalidEvery", invalidEvery, true},
{"mismatchedParams", mismatchedParams, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
scheduler := gocron.NewScheduler(time.UTC)
defer scheduler.Stop()
_, _, err := WithInterval(tt.params)(scheduler)
if (err != nil) != tt.wantErr {
t.Errorf("WithInterval() error = %v, wantErr %v", err, tt.wantErr)
return
}
})
}
}
func main() {
processDataJob := job.Interval{
ID: "process_data",
Every: config.JobInterval,
Task: Start,
Params: []any{ctx},
}
log.Debug().
Any("job_id", processDataJob.ID).
Str("every", config.JobInterval.String()).
Msgf("registering job schedule")
scheduler, err := job.NewScheduler(job.WithInterval(processDataJob))
if err != nil {
log.Error().
Err(err).Msg("failed to create scheduler")
return err
}
scheduler.StartAsync()
}
func Start(ctx context.Context) {
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment