Skip to content

Instantly share code, notes, and snippets.

@mettledrum
Last active November 27, 2017 23:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mettledrum/82ef30b234e194c0d6b09931fecb7415 to your computer and use it in GitHub Desktop.
Save mettledrum/82ef30b234e194c0d6b09931fecb7415 to your computer and use it in GitHub Desktop.
workers that run until error
package main
import (
"errors"
"fmt"
"sync"
"time"
)
// what the consumer does
type handler func(string) error
type consumer struct {
workChan <-chan string
errChan chan<- error
handler handler
quitChan chan bool
once *sync.Once
waitGroup *sync.WaitGroup
id int
}
type ConsumerGroup struct {
consumers []consumer
waitGroup *sync.WaitGroup
errChan <-chan error
}
func NewConsumerGroup(ct int, wc <-chan string, h handler) *ConsumerGroup {
consumers := make([]consumer, ct)
ec := make(chan error)
wg := &sync.WaitGroup{}
wg.Add(ct)
for i := 0; i < ct; i++ {
consumers[i] = newConsumer(i, wc, h, ec, wg)
}
return &ConsumerGroup{
consumers: consumers,
waitGroup: wg,
errChan: ec,
}
}
func (g *ConsumerGroup) RunUntilError() error {
// start 'em up
for i := 0; i < len(g.consumers); i++ {
go g.consumers[i].run()
}
var err error
// listen for errors
go func() {
for {
select {
case err = <-g.errChan:
// could √ for errors that are unrecoverable
fmt.Printf("consumer group got error: %s; closing\n", err)
g.Stop()
return
}
}
}()
g.waitGroup.Wait()
return err
}
func (g *ConsumerGroup) Stop() {
for i := range g.consumers {
g.consumers[i].stop()
}
}
func newConsumer(id int, wc <-chan string, h handler, ec chan<- error, wg *sync.WaitGroup) consumer {
return consumer{
workChan: wc,
handler: h,
quitChan: make(chan bool),
once: &sync.Once{},
errChan: ec,
waitGroup: wg,
id: id,
}
}
func (c *consumer) run() {
fmt.Printf("starting %d\n", c.id)
for {
select {
case <-c.quitChan:
fmt.Printf("closing %d\n", c.id)
c.waitGroup.Done()
return
case s := <-c.workChan:
err := c.handler(s)
// retry logic
if err != nil {
c.errChan <- err
}
}
}
}
func (c *consumer) stop() {
c.once.Do(func() { close(c.quitChan) })
}
func main() {
wc := make(chan string)
wrk := func(s string) error {
fmt.Printf("workin hard on: %s\n", s)
time.Sleep(2 * time.Second) // hard work here
fmt.Printf("finished: %s\n", s)
// simulate a handler failing
if s == "work: 5" {
return errors.New("failed while working")
}
return nil
}
g := NewConsumerGroup(5, wc, wrk)
// simulation of some producer
go func() {
ct := 0
for {
wc <- fmt.Sprintf("work: %d", ct)
time.Sleep(1 * time.Millisecond) // work is being generated quickly
ct++
}
}()
err := g.RunUntilError()
fmt.Printf("final error: %v\n", err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment