Skip to content

Instantly share code, notes, and snippets.

@dillonstreator
Last active October 7, 2021 14:16
Show Gist options
  • Save dillonstreator/ef0e9b3583f53e445688810d1e4972a5 to your computer and use it in GitHub Desktop.
Save dillonstreator/ef0e9b3583f53e445688810d1e4972a5 to your computer and use it in GitHub Desktop.
concurrently process items with a bounded channel with storing job results, graceful shutdown, pickup where left off, and capturing of all errors
package main
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"math/rand"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
"github.com/avast/retry-go"
"github.com/pkg/errors"
)
const (
itemCount = 100
maxConcurrentItems = 15
maxPerItemRetries = 3
maxRetryJitterInMs = 2500
maxProcessWaitTimeInMs = 3000
failureRate = .95
payloadKeyCompletedAt = "completedAt"
payloadKeyCompletedMap = "completedMap"
)
func main() {
rand.Seed(time.Now().Unix())
payload, err := getPayload()
if err != nil {
log.Fatal(err)
}
if _, ok := payload[payloadKeyCompletedAt]; ok {
log.Fatal("job already completed")
}
if _, ok := payload[payloadKeyCompletedMap].(map[string]interface{}); !ok {
payload[payloadKeyCompletedMap] = map[string]interface{}{}
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
cancel()
}()
items := make([]string, itemCount)
for i := 0; i < itemCount; i++ {
items[i] = strconv.Itoa(i + 1)
}
boundedCh := make(chan struct{}, maxConcurrentItems)
wg := sync.WaitGroup{}
mu := sync.Mutex{}
errs := []error{}
syncAddErr := func(err error) {
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}
var gracefulShutdownStart time.Time
for _, item := range items {
if _, ok := payload[payloadKeyCompletedMap].(map[string]interface{})[item]; ok {
fmt.Printf("skipping %s\n", item)
continue
}
boundedCh <- struct{}{}
if len(errs) > 0 {
gracefulShutdownStart = time.Now()
fmt.Println("stopping after allowing in flight processes to finish")
break
}
it := item
wg.Add(1)
go func() {
defer func() {
wg.Done()
<-boundedCh
}()
select {
case <-ctx.Done():
syncAddErr(ctx.Err())
return
default:
fmt.Printf("processing %s\n", it)
err := retry.Do(func() error {
return process(it)
}, retry.Attempts(maxPerItemRetries), retry.MaxJitter(time.Millisecond*maxRetryJitterInMs))
if err != nil {
syncAddErr(errors.Wrap(err, fmt.Sprintf("processing %s", it)))
return
}
mu.Lock()
defer mu.Unlock()
payload[payloadKeyCompletedMap].(map[string]interface{})[it] = struct{}{}
err = setPayload(payload)
if err != nil {
syncAddErr(errors.Wrap(err, fmt.Sprintf("saving completed %s", it)))
return
}
fmt.Printf("successfully completed %s\n", it)
}
}()
}
wg.Wait()
if !gracefulShutdownStart.IsZero() {
fmt.Printf("remining inflight processes took %.2fs to finish\n", time.Since(gracefulShutdownStart).Seconds())
}
if len(errs) > 0 {
log.Fatal(errs)
}
payload[payloadKeyCompletedAt] = time.Now()
err = setPayload(payload)
if err != nil {
log.Fatal(err)
}
fmt.Println("completed successfully")
}
func process(item string) error {
time.Sleep(time.Duration(rand.Intn(maxProcessWaitTimeInMs)) * time.Millisecond)
if rand.Float32() >= failureRate {
return errors.New("doing thing A failed")
}
if rand.Float32() >= failureRate {
return errors.New("doing thing B failed")
}
if rand.Float32() >= failureRate {
return errors.New("doing thing C failed")
}
return nil
}
var payloadFilename = "jobs.json"
func getPayload() (map[string]interface{}, error) {
b, err := ioutil.ReadFile(payloadFilename)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return map[string]interface{}{}, nil
}
return nil, err
}
var payload map[string]interface{}
err = json.Unmarshal(b, &payload)
if err != nil {
return nil, err
}
return payload, nil
}
func setPayload(payload map[string]interface{}) error {
jsonPayload, err := json.Marshal(payload)
if err != nil {
return err
}
return ioutil.WriteFile(payloadFilename, jsonPayload, os.ModePerm)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment