Created
December 26, 2014 15:57
-
-
Save oxtoacart/62869d732db06a9e1766 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"fmt" | |
"math/rand" | |
"reflect" | |
"sort" | |
"time" | |
) | |
const ( | |
workers = 10 // how many worker goroutines to spawn | |
iters = 100 // how many iterations of work each worker does | |
) | |
// struct that holds a counter | |
type st struct { | |
counter int | |
updates chan int | |
} | |
func (s *st) String() string { | |
return fmt.Sprintf("%3d", s.counter) | |
} | |
func main() { | |
// Make a bunch of structs and spawn workers to work on them | |
// Also create cases for a dynamic select statement (used later) | |
sts := make([]*st, 0, workers) | |
allSts := make([]*st, 0, workers) | |
cases := make([]reflect.SelectCase, 0, workers) | |
// Add a default case as the first | |
// cases = append(cases, reflect.SelectCase{ | |
// Dir: reflect.SelectDefault, | |
// }) | |
for i := 0; i < workers; i++ { | |
// Note - we buffer the updates channel for each worker so that it has | |
// an opportunity to contribute in the select statement below. | |
s := &st{0, make(chan int, 1)} | |
sts = append(sts, s) | |
allSts = append(allSts, s) | |
cases = append(cases, reflect.SelectCase{ | |
Dir: reflect.SelectRecv, | |
Chan: reflect.ValueOf(s.updates), | |
}) | |
go work(s) | |
} | |
// Here's the crux of the solution - use a single goroutine to both handle | |
// updating the counts as well as doing the "scheduling" work. | |
for { | |
// Process the next update, but only if available. That's what select | |
// with a default clause accomplishes. | |
// We use a dynamic select to let the scheduler pick from the next | |
// available worker. | |
chosen, val, ok := reflect.Select(cases) | |
if !ok { | |
// Worker finished, remove it | |
if len(sts) == 1 { | |
// That was the last worker | |
fmt.Println("Finished") | |
break | |
} | |
switch chosen { | |
case 0: | |
sts = sts[1:] | |
cases = cases[1:] | |
case len(sts) - 1: | |
sts = sts[:len(sts)] | |
cases = cases[:len(cases)] | |
default: | |
newSts := make([]*st, len(sts)-1) | |
newCases := make([]reflect.SelectCase, len(cases)-1) | |
copy(newSts[:chosen], sts[:chosen]) | |
copy(newSts[chosen:], sts[chosen+1:]) | |
copy(newCases[:chosen], cases[:chosen]) | |
copy(newCases[chosen:], cases[chosen+1:]) | |
sts = newSts | |
cases = newCases | |
} | |
continue | |
} | |
count := int(val.Int()) | |
if count == 0 { | |
continue | |
} | |
s := sts[chosen] | |
s.counter = count | |
sort.Sort(ByCounter(allSts)) | |
fmt.Printf("Scheduling with: %v\n", allSts) | |
} | |
} | |
// work simulates a worker by looping, sleeping in each loop and submitting an | |
// updated count. The key to this is that it uses the update struct as an | |
// accumulator. | |
func work(s *st) { | |
workTime := time.Duration(rand.Intn(10)+1) * time.Millisecond // the amount of time that simulated work takes | |
for i := 0; i < iters; i++ { | |
// do some work | |
time.Sleep(workTime) | |
count := i + 1 | |
// Submit the updated count, but only if there's room on the updates | |
// channel. Otherwise, just continue accumulating. | |
select { | |
case s.updates <- count: | |
// successfully submitted update | |
default: | |
last := i == iters-1 | |
if last { | |
// wait for the update to submit | |
s.updates <- count | |
} | |
} | |
} | |
fmt.Println("Closing") | |
close(s.updates) | |
} | |
type ByCounter []*st | |
func (a ByCounter) Len() int { return len(a) } | |
func (a ByCounter) Swap(i, j int) { a[i], a[j] = a[j], a[i] } | |
func (a ByCounter) Less(i, j int) bool { return a[i].counter < a[j].counter } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment