Skip to content

Instantly share code, notes, and snippets.

@d4l3k
Created December 27, 2017 21:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save d4l3k/191e3d9a03ad76ad3e51d10882667aa7 to your computer and use it in GitHub Desktop.
Save d4l3k/191e3d9a03ad76ad3e51d10882667aa7 to your computer and use it in GitHub Desktop.
firego worker queue tool
package main
import (
"fmt"
"log"
"github.com/pkg/errors"
"gopkg.in/zabawaba99/firego.v1"
)
const (
queueKey = "admin/queue"
)
type WorkerHandler func(ref *firego.Firebase) (interface{}, error)
type Queue struct {
handlers map[string]WorkerHandler
}
func NewQueue() *Queue {
return &Queue{
handlers: map[string]WorkerHandler{},
}
}
func (q *Queue) Handle(key string, f WorkerHandler) {
q.handlers[key] = f
}
func (q *Queue) Listen(db *firego.Firebase) error {
root, err := db.Ref(queueKey)
if err != nil {
return err
}
return root.ChildAdded(func(snapshot firego.DataSnapshot, previousChildKey string) {
_, started := snapshot.Child("started")
if started && false {
return
}
log.Printf("%+v %q", snapshot, previousChildKey)
typ, ok := snapshot.Child("type")
if !ok {
log.Printf("missing type: %+v", snapshot)
return
}
typeStr, ok := typ.Value.(string)
if !ok {
log.Printf("type not string: %+v", snapshot)
return
}
// check if we handle that type
if _, ok := q.handlers[typeStr]; !ok {
return
}
go func() {
if err := q.handleTask(root, typeStr, snapshot.Key); err != nil {
log.Printf("Worker error: %+v", err)
}
}()
})
}
type Task struct {
Type string `json:"type"`
Started bool `json:"started"`
Finished bool `json:"finished"`
}
var errAlreadyStarted = errors.New("already started")
func (q *Queue) handleTask(db *firego.Firebase, typ string, path string) error {
ref := db.Child(path)
startedRef := ref.Child("started")
claimed := false
if err := startedRef.Transaction(func(current interface{}) (interface{}, error) {
claimed = current == nil
if claimed {
return true, nil
}
return nil, errAlreadyStarted
}); err != nil {
return err
}
if !claimed {
return nil
}
defer func() {
if err := ref.Child("finished").Set(true); err != nil {
log.Println(err)
}
}()
log.Printf("running %q on %q", typ, path)
resp, err := q.handlers[typ](ref.Child("args"))
if err != nil {
if err := ref.Child("error").Set(fmt.Sprintf("%+v", err)); err != nil {
return err
}
return errors.Wrapf(err, "handler %q", typ)
}
if err := ref.Child("return").Set(resp); err != nil {
return err
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment