Skip to content

Instantly share code, notes, and snippets.

@jakexks
Last active August 23, 2021 12:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jakexks/bd1b460eee3c05f1e97b5f0364d154ae to your computer and use it in GitHub Desktop.
Save jakexks/bd1b460eee3c05f1e97b5f0364d154ae to your computer and use it in GitHub Desktop.
package main
import (
"context"
"log"
"time"
"github.com/go-flexible/flex"
)
func main() {
ctx, stop := context.WithTimeout(context.Background(), 10*time.Second)
defer stop()
queue := make(chan message, 2000)
workComplete := make(chan struct{})
var apps []flex.Worker
apps = append(apps, &producer{queue: queue, complete: workComplete})
for i := 1; i <= 5; i++ {
apps = append(apps, &consumer{queue: queue, id: i})
}
flex.MustStart(ctx, apps...)
}
type message struct {
foo string
}
type producer struct {
queue chan<- message
complete chan struct{}
}
type consumer struct {
id int
queue <-chan message
}
func (p *producer) Run(ctx context.Context) error {
log.Println("app starting")
for i := 0; i < 2000; i++ {
p.queue <- message{foo: "bar"}
}
log.Println("produced all messages")
close(p.complete)
<-ctx.Done()
return ctx.Err()
}
func (p *producer) Halt(ctx context.Context) error {
select {
case <-ctx.Done():
log.Println("producer cancelled")
return ctx.Err()
case <-p.complete:
log.Println("producer ended successfully")
return nil
}
}
func (c *consumer) Run(ctx context.Context) error {
// receive work but halt when ctx done
log.Printf("consumer %d starting\n", c.id)
for {
select {
case <-ctx.Done():
log.Printf("consumer %d cancelled\n", c.id)
return ctx.Err()
case <-c.queue:
log.Printf("consumer %d processing message", c.id)
time.Sleep(6 * time.Second)
}
}
}
func (c *consumer) Halt(ctx context.Context) error {
select {
case <-ctx.Done():
log.Printf("consumer %d cancelled\n", c.id)
return ctx.Err()
default:
return nil
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment