Last active
November 21, 2023 13:25
-
-
Save omerkaya1/714dd23e3aea9b831988fb307c137e6f to your computer and use it in GitHub Desktop.
Generic sync.Cond-based data collector (Go ^1.20)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package collector | |
import ( | |
"sync" | |
) | |
// Collector manages feeding workers with Job objects and Result data collection, | |
// synchronizing both parts through an internal counter. | |
// Once created, it can and should be re-used. | |
type Collector[Result, Job any] struct { | |
output chan Job | |
buffer []Result | |
stop chan struct{} | |
cnd *sync.Cond | |
jobCount int | |
} | |
func NewCollector[Result, Job any](n int) *Collector[Result, Job] { | |
return &Collector[Result, Job]{ | |
cnd: &sync.Cond{L: new(sync.Mutex)}, | |
buffer: make([]Result, 0, n), | |
output: make(chan Job, n), | |
stop: make(chan struct{}), | |
} | |
} | |
// Close sets the Collector's state to done. | |
func (c *Collector[Result, Job]) Close() { | |
c.cnd.L.Lock() | |
c.jobCount = 0 | |
close(c.stop) | |
close(c.output) | |
c.cnd.Broadcast() | |
c.cnd.L.Unlock() | |
} | |
// Enqueue allows for enqueueing new records for the Collector to operate with. | |
// When called: | |
// - the jobs counter is being updated; | |
// - the internal buffer gets flushed; | |
// - the output channel is concurrently enriched, should the number of items exceed the output channel's capacity. | |
func (c *Collector[Result, Job]) Enqueue(items ...Job) { | |
c.jobCount = len(items) | |
c.buffer = c.buffer[:0] | |
go func() { | |
for i := range items { | |
select { | |
case <-c.stop: | |
return | |
case c.output <- items[i]: | |
} | |
} | |
}() | |
} | |
// Output returns the channel of entries that independent workers feed on. | |
func (c *Collector[Result, Job]) Output() <-chan Job { | |
return c.output | |
} | |
// Collect stores processed entry in the Collector records buffer and decrements the Collector.count value. | |
func (c *Collector[Result, Job]) Collect(d Result) { | |
c.cnd.L.Lock() | |
c.buffer = append(c.buffer, d) | |
c.jobCount-- | |
c.cnd.Broadcast() | |
c.cnd.L.Unlock() | |
} | |
// Done is used to decrement the Collector's internal counter. | |
// Should be used in situations where the work was done but there is nothing to add to the Collector's buffer. | |
func (c *Collector[Result, Job]) Done() { | |
c.cnd.L.Lock() | |
c.jobCount-- | |
c.cnd.Broadcast() | |
c.cnd.L.Unlock() | |
} | |
// Results provides access to the Collector's buffer. | |
func (c *Collector[Result, Job]) Results() []Result { | |
return c.buffer | |
} | |
// Wait a synchronization point between workers feeding on the output channel and results collection. | |
// Should be called after Collector.Enqueue. | |
func (c *Collector[Result, Job]) Wait() { | |
c.cnd.L.Lock() | |
for c.jobCount > 0 { | |
c.cnd.Wait() | |
} | |
c.cnd.Broadcast() | |
c.cnd.L.Unlock() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment