Last active
June 17, 2021 15:15
-
-
Save mzanibelli/59b019bb3c1bca4ac2ad29648ba65831 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/pubsub/internal/scheduler/publish_scheduler.go b/pubsub/internal/scheduler/publish_scheduler.go | |
index e4ec5f831..106e2df94 100644 | |
--- a/pubsub/internal/scheduler/publish_scheduler.go | |
+++ b/pubsub/internal/scheduler/publish_scheduler.go | |
@@ -41,6 +41,7 @@ type PublishScheduler struct { | |
mu sync.Mutex | |
bundlers map[string]*bundler.Bundler | |
outstanding map[string]int | |
+ flushing bool | |
keysMu sync.RWMutex | |
// keysWithErrors tracks ordering keys that cannot accept new messages. | |
@@ -115,13 +116,19 @@ func (s *PublishScheduler) Add(key string, item interface{}, size int) error { | |
<-s.workers | |
nlen := reflect.ValueOf(bundle).Len() | |
+ | |
s.mu.Lock() | |
+ defer s.mu.Unlock() | |
+ | |
+ if s.flushing { | |
+ return | |
+ } | |
+ | |
s.outstanding[key] -= nlen | |
if s.outstanding[key] == 0 { | |
delete(s.outstanding, key) | |
delete(s.bundlers, key) | |
} | |
- s.mu.Unlock() | |
}) | |
b.DelayThreshold = s.DelayThreshold | |
b.BundleCountThreshold = s.BundleCountThreshold | |
@@ -153,14 +160,30 @@ func (s *PublishScheduler) Add(key string, item interface{}, size int) error { | |
// blocks until all items have been flushed. | |
func (s *PublishScheduler) FlushAndStop() { | |
close(s.done) | |
+ | |
+ s.mu.Lock() | |
+ s.flushing = true | |
+ s.mu.Unlock() | |
+ | |
for _, b := range s.bundlers { | |
b.Flush() | |
} | |
+ | |
+ s.mu.Lock() | |
+ s.flushing = false | |
+ s.bundlers = make(map[string]*bundler.Bundler) | |
+ s.outstanding = make(map[string]int) | |
+ s.mu.Unlock() | |
} | |
// Flush waits until all bundlers are sent. | |
func (s *PublishScheduler) Flush() { | |
var wg sync.WaitGroup | |
+ | |
+ s.mu.Lock() | |
+ s.flushing = true | |
+ s.mu.Unlock() | |
+ | |
for _, b := range s.bundlers { | |
wg.Add(1) | |
go func(b *bundler.Bundler) { | |
@@ -169,6 +192,12 @@ func (s *PublishScheduler) Flush() { | |
}(b) | |
} | |
wg.Wait() | |
+ | |
+ s.mu.Lock() | |
+ s.flushing = false | |
+ s.bundlers = make(map[string]*bundler.Bundler) | |
+ s.outstanding = make(map[string]int) | |
+ s.mu.Unlock() | |
} | |
// IsPaused checks if the bundler associated with an ordering keys is |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment