Skip to content

Instantly share code, notes, and snippets.

@panzerdev
Last active January 15, 2019 12:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save panzerdev/d90a1be6125ef31bbf4d18d0a1bf089d to your computer and use it in GitHub Desktop.
Save panzerdev/d90a1be6125ef31bbf4d18d0a1bf089d to your computer and use it in GitHub Desktop.
A worker based on sql with distributed locking using the DB
package main
import (
"context"
"database/sql"
"errors"
"github.com/jmoiron/sqlx"
"log"
"math/rand"
"time"
)
/*
CREATE TABLE `worker` (
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT,
`payload` varchar(255) not null,
`claimed_at` DATETIME NULL,
`handled_at` DATETIME NULL,
`error` VARCHAR(255) NULL,
`date_created` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
`date_modified` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3)
ON UPDATE CURRENT_TIMESTAMP(3)
)
ENGINE = InnoDB
DEFAULT CHARSET = utf8;
*/
var db *sqlx.DB
func startWork() {
// just for testing
nr := rand.Int()
sem := make(chan struct{}, 2)
for {
// get open items from DB
openWork, err := getOpenWorkItems()
if err != nil {
// sleep for a few sec if there are no rows at all
time.Sleep(time.Second * 2)
continue
}
// iterate over them and try to get claim
for _, id := range openWork {
log.Println(nr, "starting to work on", id)
sem <- struct{}{}
err := aquireClaim(id)
if err != nil {
log.Println(nr, "no claim for", id, err)
<-sem
// already claimed
continue
}
log.Println(nr, "claim for", id)
// when claim is successful start go routine if not all workers are busy
go handleClaimedWork(sem, id)
}
}
}
func handleClaimedWork(sem <-chan struct{}, id int) {
defer func() {
// free worker spot again
<-sem
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go handleClaim(ctx, id)
workload, err := getWorkload(id)
if err != nil {
// log err
return
}
err = work(workload)
if err != nil {
// log err
// set error on row
log.Println("Error occurred for id", id, err)
clearClaim(id)
return
}
err = setWorkDone(id)
if err != nil {
// log err
// set error on row
return
}
log.Println("Work done for", id)
}
func work(payload []byte) error {
log.Println("Payload", string(payload), "started")
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second)
if rand.Int()%2 == 0 {
return nil
}
return errors.New("Some error")
}
func handleClaim(ctx context.Context, id int) {
// endless loop with possibility to cancel
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
//clearClaim(id) ???
return
case <-ticker.C:
updateClaim(id)
// maybe check for errors and try to cancel work?
}
}
}
// DB stuff
func getOpenWorkItems() ([]int, error) {
// get all rows with a limit of 50 or so that have no claimed_at set or older then x min
// and handled_at NULL
// maybe an error column or table? retry count?
var ids []int
err := db.Select(&ids, `SELECT id FROM worker WHERE
(claimed_at IS NULL OR claimed_at < DATE_SUB(now(), INTERVAL 1 MINUTE))
AND
handled_at IS NULL
LIMIT 20;`)
if len(ids) == 0 {
return nil, sql.ErrNoRows
}
return ids, err
}
func aquireClaim(id int) error {
// try to update the row and claim it to work on it
result, err := db.Exec(`UPDATE worker
SET claimed_at=NOW()
WHERE id = ?
AND handled_at IS NULL
AND (claimed_at IS NULL OR claimed_at < DATE_SUB(now(), INTERVAL 1 MINUTE));`, id)
if err != nil {
return err
}
nr, err := result.RowsAffected()
if err != nil {
return err
}
if nr == 0 {
return errors.New("Claiming failed")
}
return nil
}
func setWorkDone(id int) error {
_, err := db.Exec(`UPDATE worker SET handled_at=NOW() WHERE id = ?;`, id)
return err
}
func getWorkload(id int) ([]byte, error) {
var payload string
err := db.Get(&payload, `SELECT payload from worker WHERE id = ?;`, id)
return []byte(payload), err
}
func updateClaim(id int) error {
_, err := db.Exec(`UPDATE worker SET claimed_at=NOW() WHERE id = ?;`, id)
return err
}
func clearClaim(id int) {
// to clear or not to clear.. that is the question
db.Exec(`UPDATE worker SET claimed_at=null WHERE id = ?;`, id)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment