Skip to content

Instantly share code, notes, and snippets.

@knzm
Created January 10, 2019 05:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save knzm/152663b44c2ffec22506643e80f5769c to your computer and use it in GitHub Desktop.
Save knzm/152663b44c2ffec22506643e80f5769c to your computer and use it in GitHub Desktop.
package main
import (
"errors"
"fmt"
"log"
"math/rand"
"sync"
"time"
)
type task struct {
name string
f func() error
}
type result struct {
name string
err error
}
const numTasks = 100
const numWorkers = 10
func lottery() error {
n := rand.Intn(10)
if n%3 == 0 {
return errors.New("bingo!")
}
time.Sleep(time.Duration(n*n*100) * time.Millisecond)
return nil
}
func main() {
taskCh := make(chan task)
resultCh := make(chan result)
quitCh := make(chan struct{})
var workerWaitGroup sync.WaitGroup
var taskWaitGroup sync.WaitGroup
// workers
for i := 0; i < numWorkers; i++ {
workerWaitGroup.Add(1)
go func() {
log.Printf("worker-%d start", i+1)
defer log.Printf("worker-%d finished", i+1)
defer workerWaitGroup.Done()
Loop:
for {
select {
case <-quitCh:
break Loop
case task, ok := <-taskCh:
if !ok {
// taskCh is closed
break Loop
}
log.Println(task.name, "start ...")
err := task.f()
resultCh <- result{
name: task.name,
err: err,
}
log.Println(task.name, "done")
taskWaitGroup.Done()
}
}
}()
}
// dispatcher
go func() {
log.Println("dispatcher start")
defer log.Println("dispatcher finished")
defer close(taskCh)
defer close(resultCh)
Loop:
for i := 0; i < numTasks; i++ {
t := task{
name: fmt.Sprintf("task-%d", i+1),
f: lottery,
}
select {
case <-quitCh:
break Loop
case taskCh <- t:
taskWaitGroup.Add(1)
}
}
log.Println("waiting taskWaitGroup ...")
taskWaitGroup.Wait()
log.Println("taskWaitGroup passed")
}()
doneCh := make(chan struct{})
var wg sync.WaitGroup
wg.Add(1)
// interrupter
go func() {
defer wg.Done()
n := rand.Intn(30)
select {
case <-time.After(time.Duration(n) * time.Second):
log.Println("Interrupted!")
close(quitCh)
case <-doneCh:
}
}()
// collector
for result := range resultCh {
if result.err != nil {
log.Println(result.name, result.err)
}
}
log.Println("waiting workerWaitGroup ...")
workerWaitGroup.Wait()
log.Println("workerWaitGroup passed")
// wait interrupter
close(doneCh)
wg.Wait()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment