Skip to content

Instantly share code, notes, and snippets.

@velotiotech
Created June 11, 2020 09:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save velotiotech/d9ad95b74d09738e676c746e9c62d8d1 to your computer and use it in GitHub Desktop.
Save velotiotech/d9ad95b74d09738e676c746e9c62d8d1 to your computer and use it in GitHub Desktop.
Task executor example
package executor
import (
"log"
"sync/atomic"
)
// The Executor struct is the main executor for tasks.
// 'maxWorkers' represents the maximum number of simultaneous goroutines.
// 'ActiveWorkers' tells the number of active goroutines spawned by the Executor at given time.
// 'Tasks' is the channel on which the Executor receives the tasks.
// 'Reports' is channel on which the Executor publishes the every tasks reports.
// 'signals' is channel that can be used to control the executor. Right now, only the termination
// signal is supported which is essentially is sending '1' on this channel by the client.
type Executor struct {
maxWorkers int64
ActiveWorkers int64
Tasks chan Task
Reports chan Report
signals chan int
}
// NewExecutor creates a new Executor.
// 'maxWorkers' tells the maximum number of simultaneous goroutines.
// 'signals' channel can be used to control the Executor.
func NewExecutor(maxWorkers int, signals chan int) *Executor {
chanSize := 1000
if maxWorkers > chanSize {
chanSize = maxWorkers
}
executor := Executor{
maxWorkers: int64(maxWorkers),
Tasks: make(chan Task, chanSize),
Reports: make(chan Report, chanSize),
signals: signals,
}
go executor.launch()
return &executor
}
// launch starts the main loop for polling on the all the relevant channels and handling differents
// messages.
func (executor *Executor) launch() int {
reports := make(chan Report, executor.maxWorkers)
for {
select {
case signal := <-executor.signals:
if executor.handleSignals(signal) == 0 {
return 0
}
case r := <-reports:
executor.addReport(r)
default:
if executor.ActiveWorkers < executor.maxWorkers && len(executor.Tasks) > 0 {
task := <-executor.Tasks
atomic.AddInt64(&executor.ActiveWorkers, 1)
go executor.launchWorker(task, reports)
}
}
}
}
// handleSignals is called whenever anything is received on the 'signals' channel.
// It performs the relevant task according to the received signal(request) and then responds either
// with 0 or 1 indicating whether the request was respected(0) or rejected(1).
func (executor *Executor) handleSignals(signal int) int {
if signal == 1 {
log.Println("Received termination request...")
if executor.Inactive() {
log.Println("No active workers, exiting...")
executor.signals <- 0
return 0
}
executor.signals <- 1
log.Println("Some tasks are still active...")
}
return 1
}
// launchWorker is called whenever a new Task is received and Executor can spawn more workers to spawn
// a new Worker.
// Each worker is launched on a new goroutine. It performs the given task and publishes the report on
// the Executor's internal reports channel.
func (executor *Executor) launchWorker(task Task, reports chan<- Report) {
report := task.Execute()
if len(reports) < cap(reports) {
reports <- report
} else {
log.Println("Executor's report channel is full...")
}
atomic.AddInt64(&executor.ActiveWorkers, -1)
}
// AddTask is used to submit a new task to the Executor is a non-blocking way. The Client can submit
// a new task using the Executor's tasks channel directly but that will block if the tasks channel is
// full.
// It should be considered that this method doesn't add the given task if the tasks channel is full
// and it is up to client to try again later.
func (executor *Executor) AddTask(task Task) bool {
if len(executor.Tasks) == cap(executor.Tasks) {
return false
}
executor.Tasks <- task
return true
}
// addReport is used by the Executor to publish the reports in a non-blocking way. It client is not
// reading the reports channel or is slower that the Executor publishing the reports, the Executor's
// reports channel is going to get full. In that case this method will not block and that report will
// not be added.
func (executor *Executor) addReport(report Report) bool {
if len(executor.Reports) == cap(executor.Reports) {
return false
}
executor.Reports <- report
return true
}
// Inactive checks if the Executor is idle. This happens when there are no pending tasks, active
// workers and reports to publish.
func (executor *Executor) Inactive() bool {
return executor.ActiveWorkers == 0 && len(executor.Tasks) == 0 && len(executor.Reports) == 0
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment