Skip to content

Instantly share code, notes, and snippets.

@oxtoacart
Created December 26, 2014 15:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save oxtoacart/62869d732db06a9e1766 to your computer and use it in GitHub Desktop.
Save oxtoacart/62869d732db06a9e1766 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"math/rand"
"reflect"
"sort"
"time"
)
const (
workers = 10 // how many worker goroutines to spawn
iters = 100 // how many iterations of work each worker does
)
// struct that holds a counter
type st struct {
counter int
updates chan int
}
func (s *st) String() string {
return fmt.Sprintf("%3d", s.counter)
}
func main() {
// Make a bunch of structs and spawn workers to work on them
// Also create cases for a dynamic select statement (used later)
sts := make([]*st, 0, workers)
allSts := make([]*st, 0, workers)
cases := make([]reflect.SelectCase, 0, workers)
// Add a default case as the first
// cases = append(cases, reflect.SelectCase{
// Dir: reflect.SelectDefault,
// })
for i := 0; i < workers; i++ {
// Note - we buffer the updates channel for each worker so that it has
// an opportunity to contribute in the select statement below.
s := &st{0, make(chan int, 1)}
sts = append(sts, s)
allSts = append(allSts, s)
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(s.updates),
})
go work(s)
}
// Here's the crux of the solution - use a single goroutine to both handle
// updating the counts as well as doing the "scheduling" work.
for {
// Process the next update, but only if available. That's what select
// with a default clause accomplishes.
// We use a dynamic select to let the scheduler pick from the next
// available worker.
chosen, val, ok := reflect.Select(cases)
if !ok {
// Worker finished, remove it
if len(sts) == 1 {
// That was the last worker
fmt.Println("Finished")
break
}
switch chosen {
case 0:
sts = sts[1:]
cases = cases[1:]
case len(sts) - 1:
sts = sts[:len(sts)]
cases = cases[:len(cases)]
default:
newSts := make([]*st, len(sts)-1)
newCases := make([]reflect.SelectCase, len(cases)-1)
copy(newSts[:chosen], sts[:chosen])
copy(newSts[chosen:], sts[chosen+1:])
copy(newCases[:chosen], cases[:chosen])
copy(newCases[chosen:], cases[chosen+1:])
sts = newSts
cases = newCases
}
continue
}
count := int(val.Int())
if count == 0 {
continue
}
s := sts[chosen]
s.counter = count
sort.Sort(ByCounter(allSts))
fmt.Printf("Scheduling with: %v\n", allSts)
}
}
// work simulates a worker by looping, sleeping in each loop and submitting an
// updated count. The key to this is that it uses the update struct as an
// accumulator.
func work(s *st) {
workTime := time.Duration(rand.Intn(10)+1) * time.Millisecond // the amount of time that simulated work takes
for i := 0; i < iters; i++ {
// do some work
time.Sleep(workTime)
count := i + 1
// Submit the updated count, but only if there's room on the updates
// channel. Otherwise, just continue accumulating.
select {
case s.updates <- count:
// successfully submitted update
default:
last := i == iters-1
if last {
// wait for the update to submit
s.updates <- count
}
}
}
fmt.Println("Closing")
close(s.updates)
}
type ByCounter []*st
func (a ByCounter) Len() int { return len(a) }
func (a ByCounter) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByCounter) Less(i, j int) bool { return a[i].counter < a[j].counter }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment