Skip to content

Instantly share code, notes, and snippets.

@wirepair
Created October 9, 2018 13:57
Show Gist options
  • Save wirepair/9213e80d8ce5794c42d937d5189b7045 to your computer and use it in GitHub Desktop.
Save wirepair/9213e80d8ce5794c42d937d5189b7045 to your computer and use it in GitHub Desktop.
re-sizable execution worker pool
// Super simple re-sizable execution worker pool that allows you to add/remove workers easily
// obviously the doWork function should use context or something to cancel any 'work' being done if necessary.
// author: https://twitter.com/_wirepair
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
type Workers struct {
workerLock sync.RWMutex
workers map[int64]chan struct{} // could choose a secure random # here for the worker index, but unixnano should suffice.
workerCount int32
maxWorkers int32
status int32
}
func New(max int32) *Workers {
return &Workers{
workers: make(map[int64]chan struct{}),
maxWorkers: max,
}
}
// Run our initial worker count.
func (w *Workers) Run() {
max := int(atomic.LoadInt32(&w.maxWorkers))
for i := 0; i < max; i++ {
w.workerLock.Lock()
closeCh := make(chan struct{})
w.workers[time.Now().UnixNano()] = closeCh
w.workerLock.Unlock()
go w.doWork(closeCh)
atomic.AddInt32(&w.workerCount, 1)
}
}
// addWorker adds a new worker and starts a new go routine.
func (w *Workers) addWorker() {
closeCh := make(chan struct{})
atomic.AddInt32(&w.maxWorkers, 1)
w.workerLock.Lock()
w.workers[time.Now().UnixNano()] = closeCh
w.workerLock.Unlock()
go w.doWork(closeCh)
atomic.AddInt32(&w.workerCount, 1)
}
// removeWorker extracts a random worker and closes it's channel and deletes it
// from our map of workers. This will only occur after it finishes processing it's doWork iteration.
func (w *Workers) removeWorker() {
atomic.AddInt32(&w.maxWorkers, -1)
w.workerLock.Lock()
for t, ch := range w.workers {
close(ch)
delete(w.workers, t)
break
}
w.workerLock.Unlock()
atomic.AddInt32(&w.workerCount, -1)
}
func (w *Workers) doWork(closeCh chan struct{}) {
for {
select {
case <-closeCh:
fmt.Println("closed channel")
return
default:
}
fmt.Println("doing worky...")
time.Sleep(1 * time.Second)
}
}
func main() {
w := New(5)
w.Run()
time.Sleep(2 * time.Second)
w.addWorker()
time.Sleep(1 * time.Second)
fmt.Println("shutting down")
for i := 0; i < 6; i++ {
w.removeWorker()
}
time.Sleep(5 * time.Second)
fmt.Println("done")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment