Last active
January 15, 2019 12:43
-
-
Save panzerdev/d90a1be6125ef31bbf4d18d0a1bf089d to your computer and use it in GitHub Desktop.
A worker based on sql with distributed locking using the DB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"database/sql" | |
"errors" | |
"github.com/jmoiron/sqlx" | |
"log" | |
"math/rand" | |
"time" | |
) | |
/* | |
CREATE TABLE `worker` ( | |
`id` INT UNSIGNED NOT NULL PRIMARY KEY AUTO_INCREMENT, | |
`payload` varchar(255) not null, | |
`claimed_at` DATETIME NULL, | |
`handled_at` DATETIME NULL, | |
`error` VARCHAR(255) NULL, | |
`date_created` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3), | |
`date_modified` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) | |
ON UPDATE CURRENT_TIMESTAMP(3) | |
) | |
ENGINE = InnoDB | |
DEFAULT CHARSET = utf8; | |
*/ | |
var db *sqlx.DB | |
func startWork() { | |
// just for testing | |
nr := rand.Int() | |
sem := make(chan struct{}, 2) | |
for { | |
// get open items from DB | |
openWork, err := getOpenWorkItems() | |
if err != nil { | |
// sleep for a few sec if there are no rows at all | |
time.Sleep(time.Second * 2) | |
continue | |
} | |
// iterate over them and try to get claim | |
for _, id := range openWork { | |
log.Println(nr, "starting to work on", id) | |
sem <- struct{}{} | |
err := aquireClaim(id) | |
if err != nil { | |
log.Println(nr, "no claim for", id, err) | |
<-sem | |
// already claimed | |
continue | |
} | |
log.Println(nr, "claim for", id) | |
// when claim is successful start go routine if not all workers are busy | |
go handleClaimedWork(sem, id) | |
} | |
} | |
} | |
func handleClaimedWork(sem <-chan struct{}, id int) { | |
defer func() { | |
// free worker spot again | |
<-sem | |
}() | |
ctx, cancel := context.WithCancel(context.Background()) | |
defer cancel() | |
go handleClaim(ctx, id) | |
workload, err := getWorkload(id) | |
if err != nil { | |
// log err | |
return | |
} | |
err = work(workload) | |
if err != nil { | |
// log err | |
// set error on row | |
log.Println("Error occurred for id", id, err) | |
clearClaim(id) | |
return | |
} | |
err = setWorkDone(id) | |
if err != nil { | |
// log err | |
// set error on row | |
return | |
} | |
log.Println("Work done for", id) | |
} | |
func work(payload []byte) error { | |
log.Println("Payload", string(payload), "started") | |
time.Sleep(time.Duration(rand.Int63n(10)) * time.Second) | |
if rand.Int()%2 == 0 { | |
return nil | |
} | |
return errors.New("Some error") | |
} | |
func handleClaim(ctx context.Context, id int) { | |
// endless loop with possibility to cancel | |
ticker := time.NewTicker(time.Second) | |
defer ticker.Stop() | |
for { | |
select { | |
case <-ctx.Done(): | |
//clearClaim(id) ??? | |
return | |
case <-ticker.C: | |
updateClaim(id) | |
// maybe check for errors and try to cancel work? | |
} | |
} | |
} | |
// DB stuff | |
func getOpenWorkItems() ([]int, error) { | |
// get all rows with a limit of 50 or so that have no claimed_at set or older then x min | |
// and handled_at NULL | |
// maybe an error column or table? retry count? | |
var ids []int | |
err := db.Select(&ids, `SELECT id FROM worker WHERE | |
(claimed_at IS NULL OR claimed_at < DATE_SUB(now(), INTERVAL 1 MINUTE)) | |
AND | |
handled_at IS NULL | |
LIMIT 20;`) | |
if len(ids) == 0 { | |
return nil, sql.ErrNoRows | |
} | |
return ids, err | |
} | |
func aquireClaim(id int) error { | |
// try to update the row and claim it to work on it | |
result, err := db.Exec(`UPDATE worker | |
SET claimed_at=NOW() | |
WHERE id = ? | |
AND handled_at IS NULL | |
AND (claimed_at IS NULL OR claimed_at < DATE_SUB(now(), INTERVAL 1 MINUTE));`, id) | |
if err != nil { | |
return err | |
} | |
nr, err := result.RowsAffected() | |
if err != nil { | |
return err | |
} | |
if nr == 0 { | |
return errors.New("Claiming failed") | |
} | |
return nil | |
} | |
func setWorkDone(id int) error { | |
_, err := db.Exec(`UPDATE worker SET handled_at=NOW() WHERE id = ?;`, id) | |
return err | |
} | |
func getWorkload(id int) ([]byte, error) { | |
var payload string | |
err := db.Get(&payload, `SELECT payload from worker WHERE id = ?;`, id) | |
return []byte(payload), err | |
} | |
func updateClaim(id int) error { | |
_, err := db.Exec(`UPDATE worker SET claimed_at=NOW() WHERE id = ?;`, id) | |
return err | |
} | |
func clearClaim(id int) { | |
// to clear or not to clear.. that is the question | |
db.Exec(`UPDATE worker SET claimed_at=null WHERE id = ?;`, id) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment