This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Collect post processes the channel "Results" and calls the ResultProcessorFunc passed in as reference | |
// for further processing. | |
func (m *Pool) collect(proc ResultProcessorFunc) { | |
log.DEBUG.Print("goRoutine collect starting") | |
for result := range m.results { | |
outcome := proc(result) | |
log.DEBUG.Printf("Job with id: [%d] completed, outcome: %s", result.Job.id, outcome) | |
} | |
log.DEBUG.Print("goRoutine collect done, setting channel done as completed") | |
m.done <- true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Collect post processes the channel "Results" and calls the ResultProcessorFunc passed in as reference | |
// for further processing. | |
func (m *Pool) collect(proc ResultProcessorFunc) { | |
log.DEBUG.Print("goRoutine collect starting") | |
for result := range m.results { | |
outcome := proc(result) | |
log.DEBUG.Printf("Job with id: [%d] completed, outcome: %s", result.Job.id, outcome) | |
} | |
log.DEBUG.Print("goRoutine collect done, setting channel done as completed") | |
m.done <- true |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// work performs the actual work by calling the processor and passing in the Job as reference obtained | |
// from iterating over the "Jobs" channel | |
func (m *Pool) work(wg *sync.WaitGroup, processor ProcessorFunc) { | |
defer wg.Done() | |
log.DEBUG.Print("goRoutine work starting") | |
for job := range m.jobs { | |
log.DEBUG.Printf("working on Job ID [%d]", job.id) | |
output := Result{job, processor(job.resource)} | |
m.results <- output | |
log.DEBUG.Printf("done with Job ID [%d]", job.id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// workerPool creates or spawns new "work" goRoutines to process the "Jobs" channel | |
func (m *Pool) workerPool(processor ProcessorFunc) { | |
defer close(m.results) | |
log.DEBUG.Printf("Worker Pool spawning new goRoutines, total: [%d]", m.numRoutines) | |
var wg sync.WaitGroup | |
for i := 0; i < m.numRoutines; i++ { | |
wg.Add(1) | |
go m.work(&wg, processor) | |
log.DEBUG.Printf("Spawned work goRoutine [%d]", i) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// allocate allocates jobs based on an array of resources to be processed by the worker pool | |
func (m *Pool) allocate(jobs []interface{}) { | |
defer close(m.jobs) | |
log.DEBUG.Printf("Allocating [%d] resources", len(jobs)) | |
for i, v := range jobs { | |
job := Job{id: i, resource: v} | |
m.jobs <- job | |
} | |
log.DEBUG.Print("Done Allocating.") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Start starts the worker pool process | |
func (m *Pool) Start(resources []interface{}, procFunc ProcessorFunc, resFunc ResultProcessorFunc) { | |
log.DEBUG.Print("worker pool starting") | |
startTime := time.Now() | |
go m.allocate(resources) | |
m.done = make(chan bool) | |
go m.collect(resFunc) | |
go m.workerPool(procFunc) | |
<-m.done | |
endTime := time.Now() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
func main() { | |
pool := workerpool.NewPool(3) // 3 goRoutines(workers) | |
pool.Start(resources, ResourceProcessor, ResultProcessor) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Collect post processes the channel "Results" and calls the ResultProcessorFunc passed in as reference | |
// for further processing. | |
func (m *Manager) Collect(proc ResultProcessorFunc) { | |
for result := range m.results { | |
outcome := proc(result) | |
fmt.Printf("Job with id: [%d] completed, outcome: %s", result.job.id, outcome) | |
} | |
m.done <- true | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// work performs the actual work by calling the processor and passing in the Job as reference obtained | |
// from iterating over the "Jobs" channel | |
func (m *Manager) work(wg *sync.WaitGroup, processor ProcessorFunc) { | |
defer wg.Done() | |
for job := range m.jobs { | |
output := Result{job, processor(job.resource)} | |
m.results <- output | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// workerPool creates or spawns new "work" goRoutines to process the "Jobs" channel | |
func (m *Manager) workerPool(processor ProcessorFunc) { | |
defer close(m.results) | |
var wg sync.WaitGroup | |
for i := 0; i < m.numRoutines; i++ { | |
wg.Add(1) | |
go m.work(&wg, processor) | |
} | |
wg.Wait() | |
} |
NewerOlder