Skip to content

Instantly share code, notes, and snippets.

@freeformz
Last active December 25, 2015 20:39
Show Gist options
  • Save freeformz/7036743 to your computer and use it in GitHub Desktop.
Save freeformz/7036743 to your computer and use it in GitHub Desktop.
// Problem...
// I want to do bulk processing against a paginated API
// Spawning a go routine per page DOS's the site, so I need throttling
// Instead send the offsets to a channel and spawn a number of workers
// to process the pages
//
// I can't just use 2 channels, because I want to easily consume
// the output and signal done with close(output) and double closing
// a channel causes a panic. I also don't know how many responses I'm
// going to get out, so I can't consume a fixed amount.
//
// Wait groups also don't work (AFAICT).
type FannyPack struct {
sync.WaitGroup
sync.Once
Out chan string
}
func NewFannyPack() *FannyPack {
return &FannyPack{Out: make(chan string)}
}
func (fp *FannyPack) NewChannel() chan string {
newOut := make(chan string)
fp.Add(1)
// Do comes from sync.Once
fp.Do(func() {
go func() {
fp.WaitGroup.Wait()
close(fp.Out)
}()
})
// Fan In to the Out channel
go func() {
defer fp.Done()
for s := range newOut {
fp.Out <- s
}
}()
return newOut
}
// I'm a sample worker
func DoWork(in chan int, out chan string) {
for i := range in {
// do work
out <- "the results"
}
// I won't get here until in is closed
close(out) // I'm done close my out channel
}
//Put it all together....
inputChannel := make(chan int)
fp := NewFannyPack()
// Spawn 24 workers
for i := 0; i < 24; i++ {
go DoWork(inputChannel, fp.NewChannel())
}
go func() {
// This feeds my batches to the workers
// Then closes the channel
// so when the workers have pulled everything off the
// channel and processed it, they will exit as well
// In my use case pagination is 100 items
// and total is the total number of items
for i := 0; i <= (total / 100); i++ {
inputChannel <- (i * 100)
}
close(inputChannel)
}()
for results := range fp.Out {
// do something with results
// runs until it's processed all the results from the workers
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment