Skip to content

Instantly share code, notes, and snippets.

@oxtoacart
Last active August 29, 2015 14:12
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save oxtoacart/fd6175cc3b188eac38c4 to your computer and use it in GitHub Desktop.
Save oxtoacart/fd6175cc3b188eac38c4 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"math/rand"
"sort"
"time"
)
const (
workers = 10 // how many worker goroutines to spawn
iters = 100 // how many iterations of work each worker does
)
// struct that holds a counter
type st struct {
counter int
updates chan int // each worker has a channel to submit updates to its own updater goroutine
}
func (s *st) String() string {
return fmt.Sprintf("%3d", s.counter)
}
type update struct {
s *st
count int
}
func main() {
// Make a channel to receive updates from all workers
updates := make(chan update)
// Make a bunch of structs and spawn workers to work on them
sts := make([]*st, 0, workers)
for i := 0; i < workers; i++ {
s := &st{0, make(chan int, 1)}
sts = append(sts, s)
go s.work(updates)
}
// Here's the crux of the solution - use a single goroutine to both handle
// updating the counts as well as doing the "scheduling" work.
for {
u := <-updates
u.s.counter = u.count
sort.Sort(ByCounter(sts))
fmt.Printf("Scheduling with: %v\n", sts)
// Sleep to simulate slow scheduler
time.Sleep(20 * time.Millisecond)
}
}
// work simulates a worker by looping, sleeping in each loop and submitting an
// updated count.
func (s *st) work(updates chan update) {
go s.sendUpdates(updates)
workTime := time.Duration(rand.Intn(10)+1) * time.Millisecond // the amount of time that simulated work takes
count := 0
countSubmitted := false
for i := 0; i < iters; i++ {
// do some work
time.Sleep(workTime)
count = i + 1
// Submit the updated count, but only if there's room on the updates
// channel. Otherwise, just continue accumulating.
select {
case s.updates <- count:
// successfully submitted update
countSubmitted = true
default:
countSubmitted = false
}
}
if !countSubmitted {
// Submit final count
s.updates <- count
}
fmt.Println("Closing")
close(s.updates)
}
// sendUpdates is a separate goroutine from work and is responsible for sending
// updates to the overall updates channel. Since the internal s.updates channel
// is buffered, that ensures that each worker always has an update ready to send
// to the overall updates channel and therefore has a chance to be selected, no
// matter how fast other workers are.
func (s *st) sendUpdates(updates chan update) {
for {
count, ok := <-s.updates
if !ok {
return
}
updates <- update{s, count}
}
}
type ByCounter []*st
func (a ByCounter) Len() int { return len(a) }
func (a ByCounter) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByCounter) Less(i, j int) bool { return a[i].counter < a[j].counter }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment