Last active
June 20, 2024 02:37
-
-
Save jasoet/39bd884e629deb4790ba7503f738c26e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package job | |
import ( | |
"fmt" | |
"github.com/go-co-op/gocron" | |
"reflect" | |
"strings" | |
"sync" | |
"time" | |
) | |
type Cron struct { | |
ID string | |
Expression string | |
Task any | |
Params []any | |
} | |
func WithCron(cron Cron) Option { | |
withError := func(err error) Option { | |
return func(sc *gocron.Scheduler) (string, *gocron.Job, error) { | |
return "", nil, err | |
} | |
} | |
if strings.TrimSpace(cron.ID) == "" { | |
err := fmt.Errorf("id cannot be blank") | |
return withError(err) | |
} | |
err := validateTask(cron.Task, cron.Params) | |
if err != nil { | |
return withError(err) | |
} | |
return func(sc *gocron.Scheduler) (string, *gocron.Job, error) { | |
job, err := sc.Cron(cron.Expression).SingletonMode().Do(cron.Task, cron.Params...) | |
if err != nil { | |
return "", nil, err | |
} | |
return cron.ID, job, err | |
} | |
} | |
type Interval struct { | |
ID string | |
Every time.Duration | |
Task any | |
Params []any | |
} | |
func WithInterval(schedule Interval) Option { | |
withError := func(err error) Option { | |
return func(sc *gocron.Scheduler) (string, *gocron.Job, error) { | |
return "", nil, err | |
} | |
} | |
if strings.TrimSpace(schedule.ID) == "" { | |
err := fmt.Errorf("id cannot be blank") | |
return withError(err) | |
} | |
err := validateTask(schedule.Task, schedule.Params) | |
if err != nil { | |
return withError(err) | |
} | |
if schedule.Every < 2*time.Second { | |
err := fmt.Errorf("duration cannot be less than 2s") | |
return withError(err) | |
} | |
return func(sc *gocron.Scheduler) (string, *gocron.Job, error) { | |
job, err := sc.Every(schedule.Every).SingletonMode().Do(schedule.Task, schedule.Params...) | |
if err != nil { | |
return "", nil, err | |
} | |
return schedule.ID, job, err | |
} | |
} | |
type Scheduler struct { | |
*gocron.Scheduler | |
mu *sync.Mutex | |
jobs map[string]*gocron.Job | |
} | |
type Option func(sc *gocron.Scheduler) (string, *gocron.Job, error) | |
func NewScheduler(options ...Option) (*Scheduler, error) { | |
jobs := make(map[string]*gocron.Job) | |
scheduler := gocron.NewScheduler(time.UTC) | |
mutex := &sync.Mutex{} | |
mutex.Lock() | |
defer mutex.Unlock() | |
for _, option := range options { | |
id, job, err := option(scheduler) | |
if err != nil { | |
return nil, err | |
} | |
jobs[id] = job | |
} | |
return &Scheduler{ | |
Scheduler: scheduler, | |
mu: mutex, | |
jobs: jobs, | |
}, nil | |
} | |
func (cs *Scheduler) AddJob(options ...Option) error { | |
cs.mu.Lock() | |
defer cs.mu.Unlock() | |
for _, option := range options { | |
id, job, err := option(cs.Scheduler) | |
if err != nil { | |
return err | |
} | |
cs.jobs[id] = job | |
} | |
return nil | |
} | |
func (cs *Scheduler) EmptyJobs() { | |
cs.mu.Lock() | |
defer cs.mu.Unlock() | |
for id, job := range cs.jobs { | |
cs.Scheduler.RemoveByReference(job) | |
delete(cs.jobs, id) | |
} | |
} | |
func (cs *Scheduler) RemoveJob(jobID string) { | |
cs.mu.Lock() | |
defer cs.mu.Unlock() | |
job, exists := cs.jobs[jobID] | |
if exists { | |
cs.Scheduler.RemoveByReference(job) | |
delete(cs.jobs, jobID) | |
} | |
} | |
func validateTask(task any, params []any) error { | |
taskReflect := reflect.ValueOf(task) | |
if task == nil { | |
err := fmt.Errorf("task cannot be nil") | |
return err | |
} | |
kind := taskReflect.Kind() | |
if kind != reflect.Func { | |
err := fmt.Errorf("task is not a function") | |
return err | |
} | |
taskType := taskReflect.Type() | |
if len(params) != taskType.NumIn() { | |
err := fmt.Errorf("number of parameters does not match the function, actual: %d, expected: %d", len(params), taskType.NumIn()) | |
return err | |
} | |
for i, param := range params { | |
if param == nil { | |
err := fmt.Errorf("nil parameter provided for function argument %d", i) | |
return err | |
} | |
paramType := reflect.ValueOf(param).Type() | |
if !paramType.AssignableTo(taskType.In(i)) { | |
err := fmt.Errorf("parameter #%d type %s does not match function's expected type %s", i, paramType.String(), taskType.In(i).String()) | |
return err | |
} | |
} | |
return nil | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package job | |
import ( | |
"github.com/go-co-op/gocron" | |
"testing" | |
"time" | |
) | |
func TestScheduler(t *testing.T) { | |
testFunc := func() {} | |
testJob := Interval{ID: "1", Every: 2 * time.Second, Task: testFunc, Params: []interface{}{}} | |
s, err := NewScheduler(WithInterval(testJob)) | |
if err != nil { | |
t.Fatalf("Expected no error, got: %v", err) | |
} | |
if _, exists := s.jobs["1"]; !exists { | |
t.Fatal("Expected job to be added") | |
} | |
s.RemoveJob("1") | |
if _, exists := s.jobs["1"]; exists { | |
t.Fatal("Expected job to be removed") | |
} | |
s.RemoveJob("1") // Test removing nonexistent job (should not panic) | |
} | |
func TestWithInterval(t *testing.T) { | |
validTask := func(args int) {} | |
invalidTask := "not a func" | |
emptyID := Interval{ID: "", Task: validTask, Every: 3 * time.Second, Params: []any{1}} | |
emptyTask := Interval{ID: "id1", Task: nil, Every: 3 * time.Second} | |
invalidTaskType := Interval{ID: "id2", Task: invalidTask, Every: 3 * time.Second} | |
invalidEvery := Interval{ID: "id3", Task: validTask, Every: 1 * time.Second, Params: []any{1}} | |
mismatchedParams := Interval{ID: "id4", Task: validTask, Every: 3 * time.Second, Params: []any{1, 2}} | |
validSchedule := Interval{ID: "id5", Task: validTask, Every: 3 * time.Second, Params: []any{1}} | |
invalidParamType := Interval{ID: "id5", Task: validTask, Every: 3 * time.Second, Params: []any{"1"}} | |
tests := []struct { | |
name string | |
params Interval | |
wantErr bool | |
}{ | |
{"validParams", validSchedule, false}, | |
{"emptyID", emptyID, true}, | |
{"emptyTask", emptyTask, true}, | |
{"invalidTaskType", invalidTaskType, true}, | |
{"invalidParamType", invalidParamType, true}, | |
{"invalidEvery", invalidEvery, true}, | |
{"mismatchedParams", mismatchedParams, true}, | |
} | |
for _, tt := range tests { | |
t.Run(tt.name, func(t *testing.T) { | |
scheduler := gocron.NewScheduler(time.UTC) | |
defer scheduler.Stop() | |
_, _, err := WithInterval(tt.params)(scheduler) | |
if (err != nil) != tt.wantErr { | |
t.Errorf("WithInterval() error = %v, wantErr %v", err, tt.wantErr) | |
return | |
} | |
}) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func main() { | |
processDataJob := job.Interval{ | |
ID: "process_data", | |
Every: config.JobInterval, | |
Task: Start, | |
Params: []any{ctx}, | |
} | |
log.Debug(). | |
Any("job_id", processDataJob.ID). | |
Str("every", config.JobInterval.String()). | |
Msgf("registering job schedule") | |
scheduler, err := job.NewScheduler(job.WithInterval(processDataJob)) | |
if err != nil { | |
log.Error(). | |
Err(err).Msg("failed to create scheduler") | |
return err | |
} | |
scheduler.StartAsync() | |
} | |
func Start(ctx context.Context) { | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment