Skip to content

Instantly share code, notes, and snippets.

@gammazero
Last active October 29, 2020 21:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gammazero/bb543631f25a73d1a3f6c8e1a6970eb8 to your computer and use it in GitHub Desktop.
Save gammazero/bb543631f25a73d1a3f6c8e1a6970eb8 to your computer and use it in GitHub Desktop.
Track workerpool job completion
package main
import (
"context"
"errors"
"fmt"
"time"
"github.com/gammazero/workerpool"
)
// myFunction is an example of a function that you run for various input.
func myFunction(name string) (string, error) {
if name == "Fedor" {
// Fedor is not allowed
return "", errors.New("not authorized")
}
if name == "Lyudmilla" {
// Lyudmilla times out
time.Sleep(25 * time.Second)
}
data := name + " is good"
return data, nil
}
func main() {
runner := NewRunner(20)
idNameMap := map[int]string{
101: "Anna",
212: "Artyom",
323: "Fedor",
434: "Katya",
545: "Lyudmilla",
}
// Run myFunction using the runner.
for id, name := range idNameMap {
name := name
runner.Run(id, func() (interface{}, error) {
return myFunction(name)
})
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// Wait until there are results from all jobs, or until timeout.
err := runner.Wait(ctx)
if err != nil {
fmt.Println("did not get all results:", err)
}
// Get results by ID.
for id, _ := range idNameMap {
data, err := runner.GetResult(id)
if err != nil {
fmt.Println(id, "error:", err)
} else {
fmt.Println(id, "data:", data.(string))
}
}
// Get a result that does not exist
id := 999
if _, err = runner.GetResult(id); err != nil {
fmt.Println(id, "error:", err)
}
}
// ----- Everything below should be in a separate "runner" package -----
var (
ErrAlreadyRunning = errors.New("job with same id is already running")
ErrNoSuchJob = errors.New("no such job")
)
type Runner struct {
resultChan chan result
results map[int]result
waitErr error
waitSet map[int]struct{}
wp *workerpool.WorkerPool
}
type result struct {
id int
data interface{}
err error
}
// NewRunner creates a new job runner.
func NewRunner(workers int) *Runner {
return &Runner{
resultChan: make(chan result),
waitSet: map[int]struct{}{},
wp: workerpool.New(workers),
}
}
// Run executes a function that returns data and error
func (r *Runner) Run(id int, f func() (interface{}, error)) error {
if _, found := r.waitSet[id]; found {
return ErrAlreadyRunning
}
r.waitSet[id] = struct{}{}
r.wp.Submit(func() {
data, err := f()
r.resultChan <- result{id, data, err}
})
return nil
}
// Wait collects job result until the context is canceled or times out.
func (r *Runner) Wait(ctx context.Context) error {
r.results = make(map[int]result, len(r.waitSet))
for len(r.waitSet) > 0 {
select {
case res := <-r.resultChan:
if _, ok := r.waitSet[res.id]; !ok {
fmt.Println("Unexpected response ID:", res.id)
continue
}
delete(r.waitSet, res.id)
r.results[res.id] = res
case <-ctx.Done():
// Context canceled or timed out
r.waitErr = ctx.Err()
return fmt.Errorf("%s while waiting for %d results", ctx.Err(),
len(r.waitSet))
}
}
return nil
}
// GetResult returns the result of running the job that had the specified id
func (r *Runner) GetResult(id int) (interface{}, error) {
result, ok := r.results[id]
if !ok {
if _, ok = r.waitSet[id]; !ok {
return nil, ErrNoSuchJob
}
return nil, r.waitErr
}
return result.data, result.err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment