Skip to content

Instantly share code, notes, and snippets.

@mzanibelli
Last active June 17, 2021 15:15
Show Gist options
  • Save mzanibelli/59b019bb3c1bca4ac2ad29648ba65831 to your computer and use it in GitHub Desktop.
Save mzanibelli/59b019bb3c1bca4ac2ad29648ba65831 to your computer and use it in GitHub Desktop.
diff --git a/pubsub/internal/scheduler/publish_scheduler.go b/pubsub/internal/scheduler/publish_scheduler.go
index e4ec5f831..106e2df94 100644
--- a/pubsub/internal/scheduler/publish_scheduler.go
+++ b/pubsub/internal/scheduler/publish_scheduler.go
@@ -41,6 +41,7 @@ type PublishScheduler struct {
mu sync.Mutex
bundlers map[string]*bundler.Bundler
outstanding map[string]int
+ flushing bool
keysMu sync.RWMutex
// keysWithErrors tracks ordering keys that cannot accept new messages.
@@ -115,13 +116,19 @@ func (s *PublishScheduler) Add(key string, item interface{}, size int) error {
<-s.workers
nlen := reflect.ValueOf(bundle).Len()
+
s.mu.Lock()
+ defer s.mu.Unlock()
+
+ if s.flushing {
+ return
+ }
+
s.outstanding[key] -= nlen
if s.outstanding[key] == 0 {
delete(s.outstanding, key)
delete(s.bundlers, key)
}
- s.mu.Unlock()
})
b.DelayThreshold = s.DelayThreshold
b.BundleCountThreshold = s.BundleCountThreshold
@@ -153,14 +160,30 @@ func (s *PublishScheduler) Add(key string, item interface{}, size int) error {
// blocks until all items have been flushed.
func (s *PublishScheduler) FlushAndStop() {
close(s.done)
+
+ s.mu.Lock()
+ s.flushing = true
+ s.mu.Unlock()
+
for _, b := range s.bundlers {
b.Flush()
}
+
+ s.mu.Lock()
+ s.flushing = false
+ s.bundlers = make(map[string]*bundler.Bundler)
+ s.outstanding = make(map[string]int)
+ s.mu.Unlock()
}
// Flush waits until all bundlers are sent.
func (s *PublishScheduler) Flush() {
var wg sync.WaitGroup
+
+ s.mu.Lock()
+ s.flushing = true
+ s.mu.Unlock()
+
for _, b := range s.bundlers {
wg.Add(1)
go func(b *bundler.Bundler) {
@@ -169,6 +192,12 @@ func (s *PublishScheduler) Flush() {
}(b)
}
wg.Wait()
+
+ s.mu.Lock()
+ s.flushing = false
+ s.bundlers = make(map[string]*bundler.Bundler)
+ s.outstanding = make(map[string]int)
+ s.mu.Unlock()
}
// IsPaused checks if the bundler associated with an ordering keys is
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment