Created
March 25, 2019 16:00
-
-
Save SkamDart/5743d38997da1eee19ef706ac6716278 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"log" | |
"sync" | |
"time" | |
) | |
type Item struct { | |
Num int | |
Complete chan int | |
} | |
func NewItem(n int) Item { | |
return Item{Num: n, Complete: make(chan int)} | |
} | |
func ThreadWorker(name string, group *sync.WaitGroup, queue chan Item) { | |
var idle time.Duration | |
for { | |
start := time.Now() | |
item, ok := <-queue | |
if !ok { | |
log.Printf("[%s] queue closed, worker exiting: idle %s", name, idle) | |
// Tell the wait-group we are done | |
group.Done() | |
return | |
} | |
idle += time.Now().Sub(start) | |
log.Printf("[%s] Processing %d", name, item.Num) | |
time.Sleep(time.Millisecond * 100) | |
close(item.Complete) | |
} | |
} | |
func PullAndSubmit(queue chan Item, batchSize int) { | |
log.Printf("[main] Submitting batch") | |
// "Pull" items and add the to the queue. | |
for i := 0; i < batchSize; i++ { | |
item := NewItem(i) | |
queue <- item | |
} | |
} | |
func StartThreadPool(queue chan Item, capacity int) *sync.WaitGroup { | |
waitGroup := &sync.WaitGroup{} | |
// Start up my "thread pool" | |
waitGroup.Add(capacity) | |
for i := 0; i < capacity; i++ { | |
go ThreadWorker(fmt.Sprintf("%d", i+1), waitGroup, queue) | |
} | |
return waitGroup | |
} | |
func main() { | |
// work queue with capacity that is same as our thread pool | |
queue := make(chan Item, 15) | |
// Start thread pool | |
waitGroup := StartThreadPool(queue, 15) | |
// Submit first batch of work, we only get batches of messages in groups of 10 though, | |
// so always less than the number of threads | |
PullAndSubmit(queue, 10) | |
// Submit more batches of work (continue forever) | |
PullAndSubmit(queue, 10) | |
PullAndSubmit(queue, 10) | |
PullAndSubmit(queue, 10) | |
PullAndSubmit(queue, 10) | |
PullAndSubmit(queue, 10) | |
// "Worker" is shutdown, so signal to our threads to stop working, | |
// and wait for all work to complete | |
log.Printf("[main] Done submitting to queue, waiting for work to finish") | |
close(queue) | |
waitGroup.Wait() | |
log.Printf("[main] All work finished") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment