Created
March 19, 2015 09:40
-
-
Save Brian61/d3870734fca53e2ac4cb to your computer and use it in GitHub Desktop.
worker control for long lived goroutine nodes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package workers | |
// Commands commands for workers (extensible) | |
type Commands int | |
// Command types | |
const ( | |
CommandNone Commands = iota | |
CommandReset | |
CommandStart | |
CommandNotifyBatchDone | |
) | |
type workerStatus int | |
const ( | |
statusIdle = iota | |
statusReady | |
statusWorking | |
statusCompletingBatch | |
) | |
// Worker interface to worker goroutines | |
type Worker interface { | |
DoWork(batchDone bool, commandIn <-chan Commands) (done bool, relayedCommand Commands) | |
Reset() | |
Shutdown() | |
} | |
// WorkerNode embedded in worker struct | |
type WorkerNode struct { | |
commandIn <-chan Commands | |
commandOut []chan<- Commands | |
} | |
func (wn *WorkerNode) notify(command Commands) { | |
for _, cout := range wn.commandOut { | |
cout <- command | |
} | |
} | |
// SetCommandIn --- | |
func (wn *WorkerNode) SetCommandIn(cmdIn <-chan Commands) { | |
wn.commandIn = cmdIn | |
} | |
// AddCommandOut ---- | |
func (wn *WorkerNode) AddCommandOut(cmdOut chan<- Commands) { | |
wn.commandOut = append(wn.commandOut, cmdOut) | |
} | |
// Run -- run the worker | |
// worker output channels *must* be buffered and | |
// workers should not use blocking reads on input channels | |
func (wn *WorkerNode) Run(worker Worker) { | |
status := statusIdle | |
command := CommandNone | |
mainloop: | |
for { | |
if command == CommandNone { | |
if status == statusIdle || status == statusReady { | |
cmd, ok := <-wn.commandIn | |
if !ok { | |
break mainloop | |
} | |
command = cmd | |
} else { | |
select { | |
case cmd, ok := <-wn.commandIn: | |
{ | |
if !ok { | |
break mainloop | |
} | |
command = cmd | |
} | |
default: | |
command = CommandNone | |
} | |
} | |
} | |
skipNotify := false | |
switch command { | |
case CommandReset: | |
{ | |
worker.Reset() | |
status = statusReady | |
} | |
case CommandStart: | |
{ | |
status = statusWorking | |
} | |
case CommandNotifyBatchDone: | |
{ | |
status = statusCompletingBatch | |
skipNotify = true | |
} | |
default: | |
skipNotify = true | |
} | |
if !skipNotify { | |
wn.notify(command) | |
} | |
if status == statusWorking || status == statusCompletingBatch { | |
batchDone := status == statusCompletingBatch | |
workerDone, relayedCommand := worker.DoWork(batchDone, wn.commandIn) | |
if workerDone && batchDone { | |
wn.notify(CommandNotifyBatchDone) | |
status = statusIdle | |
} | |
command = relayedCommand | |
} else { | |
command = CommandNone | |
} | |
} | |
for _, cmdOut := range wn.commandOut { | |
close(cmdOut) | |
} | |
worker.Shutdown() | |
return | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment