Skip to content

Instantly share code, notes, and snippets.

@SkamDart
Created March 25, 2019 16:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save SkamDart/5743d38997da1eee19ef706ac6716278 to your computer and use it in GitHub Desktop.
Save SkamDart/5743d38997da1eee19ef706ac6716278 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"log"
"sync"
"time"
)
type Item struct {
Num int
Complete chan int
}
func NewItem(n int) Item {
return Item{Num: n, Complete: make(chan int)}
}
func ThreadWorker(name string, group *sync.WaitGroup, queue chan Item) {
var idle time.Duration
for {
start := time.Now()
item, ok := <-queue
if !ok {
log.Printf("[%s] queue closed, worker exiting: idle %s", name, idle)
// Tell the wait-group we are done
group.Done()
return
}
idle += time.Now().Sub(start)
log.Printf("[%s] Processing %d", name, item.Num)
time.Sleep(time.Millisecond * 100)
close(item.Complete)
}
}
func PullAndSubmit(queue chan Item, batchSize int) {
log.Printf("[main] Submitting batch")
// "Pull" items and add the to the queue.
for i := 0; i < batchSize; i++ {
item := NewItem(i)
queue <- item
}
}
func StartThreadPool(queue chan Item, capacity int) *sync.WaitGroup {
waitGroup := &sync.WaitGroup{}
// Start up my "thread pool"
waitGroup.Add(capacity)
for i := 0; i < capacity; i++ {
go ThreadWorker(fmt.Sprintf("%d", i+1), waitGroup, queue)
}
return waitGroup
}
func main() {
// work queue with capacity that is same as our thread pool
queue := make(chan Item, 15)
// Start thread pool
waitGroup := StartThreadPool(queue, 15)
// Submit first batch of work, we only get batches of messages in groups of 10 though,
// so always less than the number of threads
PullAndSubmit(queue, 10)
// Submit more batches of work (continue forever)
PullAndSubmit(queue, 10)
PullAndSubmit(queue, 10)
PullAndSubmit(queue, 10)
PullAndSubmit(queue, 10)
PullAndSubmit(queue, 10)
// "Worker" is shutdown, so signal to our threads to stop working,
// and wait for all work to complete
log.Printf("[main] Done submitting to queue, waiting for work to finish")
close(queue)
waitGroup.Wait()
log.Printf("[main] All work finished")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment