Skip to content

Instantly share code, notes, and snippets.

@elliotchance
Created July 24, 2019 06:36
Show Gist options
  • Save elliotchance/f3c4a2818f86ce2ed0c774f5fe263279 to your computer and use it in GitHub Desktop.
Save elliotchance/f3c4a2818f86ce2ed0c774f5fe263279 to your computer and use it in GitHub Desktop.
package main
import (
"fmt"
"time"
)
func BatchStrings(values <-chan string, maxItems int, maxTimeout time.Duration) chan []string {
batches := make(chan []string)
go func() {
defer close(batches)
for keepGoing := true; keepGoing; {
var batch []string
expire := time.After(maxTimeout)
for {
select {
case value, ok := <-values:
if !ok {
keepGoing = false
goto done
}
batch = append(batch, value)
if len(batch) == maxItems {
goto done
}
case <-expire:
goto done
}
}
done:
if len(batch) > 0 {
batches <- batch
}
}
}()
return batches
}
func main() {
strings := make(chan string)
go func() {
strings <- "hello"
strings <- "there" // hit limit of 2
strings <- "how"
time.Sleep(15 * time.Millisecond) // hit timeout
strings <- "are"
strings <- "you" // hit limit of 2
// A really long time without any new values.
time.Sleep(500 * time.Millisecond)
strings <- "doing?" // the last incomplete batch
close(strings)
}()
start := time.Now()
batches := BatchStrings(strings, 2, 10*time.Millisecond)
for batch := range batches {
fmt.Println(time.Now().Sub(start), batch)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment