Skip to content

Instantly share code, notes, and snippets.

@Brian61
Created March 19, 2015 09:40
Show Gist options
  • Save Brian61/d3870734fca53e2ac4cb to your computer and use it in GitHub Desktop.
Save Brian61/d3870734fca53e2ac4cb to your computer and use it in GitHub Desktop.
worker control for long lived goroutine nodes
package workers
// Commands commands for workers (extensible)
type Commands int
// Command types
const (
CommandNone Commands = iota
CommandReset
CommandStart
CommandNotifyBatchDone
)
type workerStatus int
const (
statusIdle = iota
statusReady
statusWorking
statusCompletingBatch
)
// Worker interface to worker goroutines
type Worker interface {
DoWork(batchDone bool, commandIn <-chan Commands) (done bool, relayedCommand Commands)
Reset()
Shutdown()
}
// WorkerNode embedded in worker struct
type WorkerNode struct {
commandIn <-chan Commands
commandOut []chan<- Commands
}
func (wn *WorkerNode) notify(command Commands) {
for _, cout := range wn.commandOut {
cout <- command
}
}
// SetCommandIn ---
func (wn *WorkerNode) SetCommandIn(cmdIn <-chan Commands) {
wn.commandIn = cmdIn
}
// AddCommandOut ----
func (wn *WorkerNode) AddCommandOut(cmdOut chan<- Commands) {
wn.commandOut = append(wn.commandOut, cmdOut)
}
// Run -- run the worker
// worker output channels *must* be buffered and
// workers should not use blocking reads on input channels
func (wn *WorkerNode) Run(worker Worker) {
status := statusIdle
command := CommandNone
mainloop:
for {
if command == CommandNone {
if status == statusIdle || status == statusReady {
cmd, ok := <-wn.commandIn
if !ok {
break mainloop
}
command = cmd
} else {
select {
case cmd, ok := <-wn.commandIn:
{
if !ok {
break mainloop
}
command = cmd
}
default:
command = CommandNone
}
}
}
skipNotify := false
switch command {
case CommandReset:
{
worker.Reset()
status = statusReady
}
case CommandStart:
{
status = statusWorking
}
case CommandNotifyBatchDone:
{
status = statusCompletingBatch
skipNotify = true
}
default:
skipNotify = true
}
if !skipNotify {
wn.notify(command)
}
if status == statusWorking || status == statusCompletingBatch {
batchDone := status == statusCompletingBatch
workerDone, relayedCommand := worker.DoWork(batchDone, wn.commandIn)
if workerDone && batchDone {
wn.notify(CommandNotifyBatchDone)
status = statusIdle
}
command = relayedCommand
} else {
command = CommandNone
}
}
for _, cmdOut := range wn.commandOut {
close(cmdOut)
}
worker.Shutdown()
return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment