Last active
December 25, 2015 20:39
-
-
Save freeformz/7036743 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Problem... | |
// I want to do bulk processing against a paginated API | |
// Spawning a go routine per page DOS's the site, so I need throttling | |
// Instead send the offsets to a channel and spawn a number of workers | |
// to process the pages | |
// | |
// I can't just use 2 channels, because I want to easily consume | |
// the output and signal done with close(output) and double closing | |
// a channel causes a panic. I also don't know how many responses I'm | |
// going to get out, so I can't consume a fixed amount. | |
// | |
// Wait groups also don't work (AFAICT). | |
type FannyPack struct { | |
sync.WaitGroup | |
sync.Once | |
Out chan string | |
} | |
func NewFannyPack() *FannyPack { | |
return &FannyPack{Out: make(chan string)} | |
} | |
func (fp *FannyPack) NewChannel() chan string { | |
newOut := make(chan string) | |
fp.Add(1) | |
// Do comes from sync.Once | |
fp.Do(func() { | |
go func() { | |
fp.WaitGroup.Wait() | |
close(fp.Out) | |
}() | |
}) | |
// Fan In to the Out channel | |
go func() { | |
defer fp.Done() | |
for s := range newOut { | |
fp.Out <- s | |
} | |
}() | |
return newOut | |
} | |
// I'm a sample worker | |
func DoWork(in chan int, out chan string) { | |
for i := range in { | |
// do work | |
out <- "the results" | |
} | |
// I won't get here until in is closed | |
close(out) // I'm done close my out channel | |
} | |
//Put it all together.... | |
inputChannel := make(chan int) | |
fp := NewFannyPack() | |
// Spawn 24 workers | |
for i := 0; i < 24; i++ { | |
go DoWork(inputChannel, fp.NewChannel()) | |
} | |
go func() { | |
// This feeds my batches to the workers | |
// Then closes the channel | |
// so when the workers have pulled everything off the | |
// channel and processed it, they will exit as well | |
// In my use case pagination is 100 items | |
// and total is the total number of items | |
for i := 0; i <= (total / 100); i++ { | |
inputChannel <- (i * 100) | |
} | |
close(inputChannel) | |
}() | |
for results := range fp.Out { | |
// do something with results | |
// runs until it's processed all the results from the workers | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment