Skip to content

Instantly share code, notes, and snippets.

@Loschcode
Created April 20, 2024 01:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Loschcode/9a9f643fe5086b3585d9c3507f2214b8 to your computer and use it in GitHub Desktop.
Save Loschcode/9a9f643fe5086b3585d9c3507f2214b8 to your computer and use it in GitHub Desktop.
Job scheduler in Golang
package job_scheduler
import (
"context"
"database/sql/driver"
"encoding/json"
"fmt"
"log"
"time"
"aquiestoy/pkg/mailer"
"aquiestoy/pkg/tracking"
"github.com/getsentry/sentry-go"
"github.com/go-co-op/gocron"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/redis/go-redis/v9"
"gorm.io/gorm"
)
const (
maxConcurrency = 10
fetchFrequencyInSeconds = 10
)
type Clients struct {
Db *gorm.DB
Redis *redis.Client
Tracking *tracking.Tracking
Mailer *mailer.Mailer
}
type Scheduler struct {
Clients Clients
JobsMap JobsMap
}
type JobsMap map[string]func(*Scheduler, map[string]interface{}) error
type ScheduledJob struct {
ID *uuid.UUID `json:"id,omitempty" gorm:"type:uuid;default:uuid_generate_v4()" db:"id"`
ScheduledFor time.Time
FunctionName string
Arguments Arguments
Status string
CreatedAt time.Time
UpdatedAt time.Time
}
// all of that is to handle JSONB
// if it's seen in more places in the code
// Let's convert this struct name into "JSONB"
// or something
type Arguments map[string]interface{}
func (args Arguments) Value() (driver.Value, error) {
return json.Marshal(args)
}
func (args *Arguments) Scan(src interface{}) error {
source, ok := src.([]byte)
if !ok {
return fmt.Errorf("type assertion when processing JSONB failed (1)")
}
var i interface{}
err := json.Unmarshal(source, &i)
if err != nil {
return err
}
*args, ok = i.(map[string]interface{})
if !ok {
return fmt.Errorf("type assertion when processing JSONB failed (2)")
}
return nil
}
func NewScheduler(clients Clients, jobsMap JobsMap) (*Scheduler, error) {
log.Println("About to launch job scheduler.")
return &Scheduler{
Clients: clients,
JobsMap: jobsMap,
}, nil
}
func (schdl *Scheduler) AddJob(scheduled_for time.Time, functionName string, arguments map[string]interface{}) error {
job := ScheduledJob{
ScheduledFor: scheduled_for,
FunctionName: functionName,
Status: "wait",
Arguments: arguments,
}
if err := schdl.Clients.Db.Create(&job).Error; err != nil {
// we'll usually not consider the error outside of this to not block the system
// so we can spawn sentry from here
sentry.CaptureException(err)
return err
}
log.Printf("A new job %s has been inserted.\n", functionName)
return nil
}
func (schdl *Scheduler) Run() {
go schdl.periodicJobChecker()
select {}
}
func (schdl *Scheduler) runJob(job *ScheduledJob) error {
if customFunction, exists := schdl.JobsMap[job.FunctionName]; exists {
err := customFunction(schdl, job.Arguments)
if err != nil {
schdl.Clients.Db.Model(&job).Update("status", "fail")
sentry.CaptureException(err)
readError := fmt.Sprintf("Job %s failed: %s", job.FunctionName, err)
log.Print(readError)
schdl.Clients.Tracking.AddAnonymousEvent(context.Background(), readError, job.Arguments)
return err
}
schdl.Clients.Db.Delete(job)
} else {
log.Printf("Function %s not found", job.FunctionName)
}
return nil
}
// Add status with wait, pending, fail, timeout
func (schdl *Scheduler) periodicJobChecker() error {
cronScheduler := gocron.NewScheduler(time.UTC)
_, err := cronScheduler.Every(10).Seconds().Do(schdl.checkAndRunJobs)
if err != nil {
return err
}
cronScheduler.StartAsync()
log.Println("CRON Job has been started asynchronously")
return nil
}
func (schdl *Scheduler) checkAndRunJobs() error {
var jobs []*ScheduledJob
now := time.Now()
result := schdl.Clients.Db.Where("scheduled_for < ? AND status = ?", now, "wait").Limit(maxConcurrency).Find(&jobs)
if result.Error != nil {
return result.Error
}
if len(jobs) > 0 {
log.Printf("%v jobs will be processed\n", len(jobs))
}
for _, job := range jobs {
schdl.Clients.Db.Model(&job).Update("status", "pending")
if job.ScheduledFor.Before(now) {
// this magical piece of code prevent panic from
// spreading outside of the goroutine
go func(job *ScheduledJob) {
defer func() {
if r := recover(); r != nil {
log.Printf("Job %s just crashed\n", job.FunctionName)
sentry.CaptureMessage(fmt.Sprintf("Job crashed %s but we don't have the trace, it's likely a job internals issue", job.FunctionName))
schdl.Clients.Db.Model(&job).Update("status", "fail")
}
}()
schdl.runJob(job)
}(job)
}
}
return nil
}
package jobs
import (
"aquiestoy/internal/models"
"aquiestoy/internal/services"
"aquiestoy/pkg/job_scheduler"
"aquiestoy/pkg/tracking"
"context"
"fmt"
"time"
"github.com/google/uuid"
"gorm.io/gorm"
)
func ServicesClients(schdl *job_scheduler.Scheduler) services.Clients {
return services.Clients{
Db: schdl.Clients.Db,
Redis: schdl.Clients.Redis,
Tracking: schdl.Clients.Tracking,
Mailer: schdl.Clients.Mailer,
JobScheduler: schdl,
}
}
// Map you must add the jobs you want to available to the scheduler in here
func Map() job_scheduler.JobsMap {
return job_scheduler.JobsMap{
"ping": ping,
}
}
func ping(schdl *job_scheduler.Scheduler, args map[string]interface{}) error {
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment