Skip to content

Instantly share code, notes, and snippets.

View dmora's full-sized avatar

David Mora dmora

View GitHub Profile
// Collect post processes the channel "Results" and calls the ResultProcessorFunc passed in as reference
// for further processing.
func (m *Pool) collect(proc ResultProcessorFunc) {
log.DEBUG.Print("goRoutine collect starting")
for result := range m.results {
outcome := proc(result)
log.DEBUG.Printf("Job with id: [%d] completed, outcome: %s", result.Job.id, outcome)
}
log.DEBUG.Print("goRoutine collect done, setting channel done as completed")
m.done <- true
// Collect post processes the channel "Results" and calls the ResultProcessorFunc passed in as reference
// for further processing.
func (m *Pool) collect(proc ResultProcessorFunc) {
log.DEBUG.Print("goRoutine collect starting")
for result := range m.results {
outcome := proc(result)
log.DEBUG.Printf("Job with id: [%d] completed, outcome: %s", result.Job.id, outcome)
}
log.DEBUG.Print("goRoutine collect done, setting channel done as completed")
m.done <- true
// work performs the actual work by calling the processor and passing in the Job as reference obtained
// from iterating over the "Jobs" channel
func (m *Pool) work(wg *sync.WaitGroup, processor ProcessorFunc) {
defer wg.Done()
log.DEBUG.Print("goRoutine work starting")
for job := range m.jobs {
log.DEBUG.Printf("working on Job ID [%d]", job.id)
output := Result{job, processor(job.resource)}
m.results <- output
log.DEBUG.Printf("done with Job ID [%d]", job.id)
// workerPool creates or spawns new "work" goRoutines to process the "Jobs" channel
func (m *Pool) workerPool(processor ProcessorFunc) {
defer close(m.results)
log.DEBUG.Printf("Worker Pool spawning new goRoutines, total: [%d]", m.numRoutines)
var wg sync.WaitGroup
for i := 0; i < m.numRoutines; i++ {
wg.Add(1)
go m.work(&wg, processor)
log.DEBUG.Printf("Spawned work goRoutine [%d]", i)
}
// allocate allocates jobs based on an array of resources to be processed by the worker pool
func (m *Pool) allocate(jobs []interface{}) {
defer close(m.jobs)
log.DEBUG.Printf("Allocating [%d] resources", len(jobs))
for i, v := range jobs {
job := Job{id: i, resource: v}
m.jobs <- job
}
log.DEBUG.Print("Done Allocating.")
// Start starts the worker pool process
func (m *Pool) Start(resources []interface{}, procFunc ProcessorFunc, resFunc ResultProcessorFunc) {
log.DEBUG.Print("worker pool starting")
startTime := time.Now()
go m.allocate(resources)
m.done = make(chan bool)
go m.collect(resFunc)
go m.workerPool(procFunc)
<-m.done
endTime := time.Now()
func main() {
pool := workerpool.NewPool(3) // 3 goRoutines(workers)
pool.Start(resources, ResourceProcessor, ResultProcessor)
}
// Collect post processes the channel "Results" and calls the ResultProcessorFunc passed in as reference
// for further processing.
func (m *Manager) Collect(proc ResultProcessorFunc) {
for result := range m.results {
outcome := proc(result)
fmt.Printf("Job with id: [%d] completed, outcome: %s", result.job.id, outcome)
}
m.done <- true
}
// work performs the actual work by calling the processor and passing in the Job as reference obtained
// from iterating over the "Jobs" channel
func (m *Manager) work(wg *sync.WaitGroup, processor ProcessorFunc) {
defer wg.Done()
for job := range m.jobs {
output := Result{job, processor(job.resource)}
m.results <- output
}
}
// workerPool creates or spawns new "work" goRoutines to process the "Jobs" channel
func (m *Manager) workerPool(processor ProcessorFunc) {
defer close(m.results)
var wg sync.WaitGroup
for i := 0; i < m.numRoutines; i++ {
wg.Add(1)
go m.work(&wg, processor)
}
wg.Wait()
}