Skip to content

Instantly share code, notes, and snippets.

@omerkaya1
Last active November 21, 2023 13:25
Show Gist options
  • Save omerkaya1/714dd23e3aea9b831988fb307c137e6f to your computer and use it in GitHub Desktop.
Save omerkaya1/714dd23e3aea9b831988fb307c137e6f to your computer and use it in GitHub Desktop.
Generic sync.Cond-based data collector (Go ^1.20)
package collector
import (
"sync"
)
// Collector manages feeding workers with Job objects and Result data collection,
// synchronizing both parts through an internal counter.
// Once created, it can and should be re-used.
type Collector[Result, Job any] struct {
output chan Job
buffer []Result
stop chan struct{}
cnd *sync.Cond
jobCount int
}
func NewCollector[Result, Job any](n int) *Collector[Result, Job] {
return &Collector[Result, Job]{
cnd: &sync.Cond{L: new(sync.Mutex)},
buffer: make([]Result, 0, n),
output: make(chan Job, n),
stop: make(chan struct{}),
}
}
// Close sets the Collector's state to done.
func (c *Collector[Result, Job]) Close() {
c.cnd.L.Lock()
c.jobCount = 0
close(c.stop)
close(c.output)
c.cnd.Broadcast()
c.cnd.L.Unlock()
}
// Enqueue allows for enqueueing new records for the Collector to operate with.
// When called:
// - the jobs counter is being updated;
// - the internal buffer gets flushed;
// - the output channel is concurrently enriched, should the number of items exceed the output channel's capacity.
func (c *Collector[Result, Job]) Enqueue(items ...Job) {
c.jobCount = len(items)
c.buffer = c.buffer[:0]
go func() {
for i := range items {
select {
case <-c.stop:
return
case c.output <- items[i]:
}
}
}()
}
// Output returns the channel of entries that independent workers feed on.
func (c *Collector[Result, Job]) Output() <-chan Job {
return c.output
}
// Collect stores processed entry in the Collector records buffer and decrements the Collector.count value.
func (c *Collector[Result, Job]) Collect(d Result) {
c.cnd.L.Lock()
c.buffer = append(c.buffer, d)
c.jobCount--
c.cnd.Broadcast()
c.cnd.L.Unlock()
}
// Done is used to decrement the Collector's internal counter.
// Should be used in situations where the work was done but there is nothing to add to the Collector's buffer.
func (c *Collector[Result, Job]) Done() {
c.cnd.L.Lock()
c.jobCount--
c.cnd.Broadcast()
c.cnd.L.Unlock()
}
// Results provides access to the Collector's buffer.
func (c *Collector[Result, Job]) Results() []Result {
return c.buffer
}
// Wait a synchronization point between workers feeding on the output channel and results collection.
// Should be called after Collector.Enqueue.
func (c *Collector[Result, Job]) Wait() {
c.cnd.L.Lock()
for c.jobCount > 0 {
c.cnd.Wait()
}
c.cnd.Broadcast()
c.cnd.L.Unlock()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment