Created
June 11, 2020 09:14
-
-
Save velotiotech/d9ad95b74d09738e676c746e9c62d8d1 to your computer and use it in GitHub Desktop.
Task executor example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package executor | |
import ( | |
"log" | |
"sync/atomic" | |
) | |
// The Executor struct is the main executor for tasks. | |
// 'maxWorkers' represents the maximum number of simultaneous goroutines. | |
// 'ActiveWorkers' tells the number of active goroutines spawned by the Executor at given time. | |
// 'Tasks' is the channel on which the Executor receives the tasks. | |
// 'Reports' is channel on which the Executor publishes the every tasks reports. | |
// 'signals' is channel that can be used to control the executor. Right now, only the termination | |
// signal is supported which is essentially is sending '1' on this channel by the client. | |
type Executor struct { | |
maxWorkers int64 | |
ActiveWorkers int64 | |
Tasks chan Task | |
Reports chan Report | |
signals chan int | |
} | |
// NewExecutor creates a new Executor. | |
// 'maxWorkers' tells the maximum number of simultaneous goroutines. | |
// 'signals' channel can be used to control the Executor. | |
func NewExecutor(maxWorkers int, signals chan int) *Executor { | |
chanSize := 1000 | |
if maxWorkers > chanSize { | |
chanSize = maxWorkers | |
} | |
executor := Executor{ | |
maxWorkers: int64(maxWorkers), | |
Tasks: make(chan Task, chanSize), | |
Reports: make(chan Report, chanSize), | |
signals: signals, | |
} | |
go executor.launch() | |
return &executor | |
} | |
// launch starts the main loop for polling on the all the relevant channels and handling differents | |
// messages. | |
func (executor *Executor) launch() int { | |
reports := make(chan Report, executor.maxWorkers) | |
for { | |
select { | |
case signal := <-executor.signals: | |
if executor.handleSignals(signal) == 0 { | |
return 0 | |
} | |
case r := <-reports: | |
executor.addReport(r) | |
default: | |
if executor.ActiveWorkers < executor.maxWorkers && len(executor.Tasks) > 0 { | |
task := <-executor.Tasks | |
atomic.AddInt64(&executor.ActiveWorkers, 1) | |
go executor.launchWorker(task, reports) | |
} | |
} | |
} | |
} | |
// handleSignals is called whenever anything is received on the 'signals' channel. | |
// It performs the relevant task according to the received signal(request) and then responds either | |
// with 0 or 1 indicating whether the request was respected(0) or rejected(1). | |
func (executor *Executor) handleSignals(signal int) int { | |
if signal == 1 { | |
log.Println("Received termination request...") | |
if executor.Inactive() { | |
log.Println("No active workers, exiting...") | |
executor.signals <- 0 | |
return 0 | |
} | |
executor.signals <- 1 | |
log.Println("Some tasks are still active...") | |
} | |
return 1 | |
} | |
// launchWorker is called whenever a new Task is received and Executor can spawn more workers to spawn | |
// a new Worker. | |
// Each worker is launched on a new goroutine. It performs the given task and publishes the report on | |
// the Executor's internal reports channel. | |
func (executor *Executor) launchWorker(task Task, reports chan<- Report) { | |
report := task.Execute() | |
if len(reports) < cap(reports) { | |
reports <- report | |
} else { | |
log.Println("Executor's report channel is full...") | |
} | |
atomic.AddInt64(&executor.ActiveWorkers, -1) | |
} | |
// AddTask is used to submit a new task to the Executor is a non-blocking way. The Client can submit | |
// a new task using the Executor's tasks channel directly but that will block if the tasks channel is | |
// full. | |
// It should be considered that this method doesn't add the given task if the tasks channel is full | |
// and it is up to client to try again later. | |
func (executor *Executor) AddTask(task Task) bool { | |
if len(executor.Tasks) == cap(executor.Tasks) { | |
return false | |
} | |
executor.Tasks <- task | |
return true | |
} | |
// addReport is used by the Executor to publish the reports in a non-blocking way. It client is not | |
// reading the reports channel or is slower that the Executor publishing the reports, the Executor's | |
// reports channel is going to get full. In that case this method will not block and that report will | |
// not be added. | |
func (executor *Executor) addReport(report Report) bool { | |
if len(executor.Reports) == cap(executor.Reports) { | |
return false | |
} | |
executor.Reports <- report | |
return true | |
} | |
// Inactive checks if the Executor is idle. This happens when there are no pending tasks, active | |
// workers and reports to publish. | |
func (executor *Executor) Inactive() bool { | |
return executor.ActiveWorkers == 0 && len(executor.Tasks) == 0 && len(executor.Reports) == 0 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment