Last active
November 27, 2017 23:51
-
-
Save mettledrum/82ef30b234e194c0d6b09931fecb7415 to your computer and use it in GitHub Desktop.
workers that run until error
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"errors" | |
"fmt" | |
"sync" | |
"time" | |
) | |
// what the consumer does | |
type handler func(string) error | |
type consumer struct { | |
workChan <-chan string | |
errChan chan<- error | |
handler handler | |
quitChan chan bool | |
once *sync.Once | |
waitGroup *sync.WaitGroup | |
id int | |
} | |
type ConsumerGroup struct { | |
consumers []consumer | |
waitGroup *sync.WaitGroup | |
errChan <-chan error | |
} | |
func NewConsumerGroup(ct int, wc <-chan string, h handler) *ConsumerGroup { | |
consumers := make([]consumer, ct) | |
ec := make(chan error) | |
wg := &sync.WaitGroup{} | |
wg.Add(ct) | |
for i := 0; i < ct; i++ { | |
consumers[i] = newConsumer(i, wc, h, ec, wg) | |
} | |
return &ConsumerGroup{ | |
consumers: consumers, | |
waitGroup: wg, | |
errChan: ec, | |
} | |
} | |
func (g *ConsumerGroup) RunUntilError() error { | |
// start 'em up | |
for i := 0; i < len(g.consumers); i++ { | |
go g.consumers[i].run() | |
} | |
var err error | |
// listen for errors | |
go func() { | |
for { | |
select { | |
case err = <-g.errChan: | |
// could √ for errors that are unrecoverable | |
fmt.Printf("consumer group got error: %s; closing\n", err) | |
g.Stop() | |
return | |
} | |
} | |
}() | |
g.waitGroup.Wait() | |
return err | |
} | |
func (g *ConsumerGroup) Stop() { | |
for i := range g.consumers { | |
g.consumers[i].stop() | |
} | |
} | |
func newConsumer(id int, wc <-chan string, h handler, ec chan<- error, wg *sync.WaitGroup) consumer { | |
return consumer{ | |
workChan: wc, | |
handler: h, | |
quitChan: make(chan bool), | |
once: &sync.Once{}, | |
errChan: ec, | |
waitGroup: wg, | |
id: id, | |
} | |
} | |
func (c *consumer) run() { | |
fmt.Printf("starting %d\n", c.id) | |
for { | |
select { | |
case <-c.quitChan: | |
fmt.Printf("closing %d\n", c.id) | |
c.waitGroup.Done() | |
return | |
case s := <-c.workChan: | |
err := c.handler(s) | |
// retry logic | |
if err != nil { | |
c.errChan <- err | |
} | |
} | |
} | |
} | |
func (c *consumer) stop() { | |
c.once.Do(func() { close(c.quitChan) }) | |
} | |
func main() { | |
wc := make(chan string) | |
wrk := func(s string) error { | |
fmt.Printf("workin hard on: %s\n", s) | |
time.Sleep(2 * time.Second) // hard work here | |
fmt.Printf("finished: %s\n", s) | |
// simulate a handler failing | |
if s == "work: 5" { | |
return errors.New("failed while working") | |
} | |
return nil | |
} | |
g := NewConsumerGroup(5, wc, wrk) | |
// simulation of some producer | |
go func() { | |
ct := 0 | |
for { | |
wc <- fmt.Sprintf("work: %d", ct) | |
time.Sleep(1 * time.Millisecond) // work is being generated quickly | |
ct++ | |
} | |
}() | |
err := g.RunUntilError() | |
fmt.Printf("final error: %v\n", err) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment