Skip to content

Instantly share code, notes, and snippets.

@ripienaar
Created April 8, 2020 14:51
Show Gist options
  • Save ripienaar/04de8e6a798a24644b379cb7457bea23 to your computer and use it in GitHub Desktop.
Save ripienaar/04de8e6a798a24644b379cb7457bea23 to your computer and use it in GitHub Desktop.
func inFlightBuffer(_ *jsm.Stream, consumer *jsm.Consumer, tests int) error {
buffer := make(chan *nats.Msg, 1000)
ib := nats.NewInbox()
trigger := make(chan struct{})
done := make(chan struct{})
_, err := nc.ChanSubscribe(ib, buffer)
if err != nil {
return err
}
filler := func() {
expected := 0
for {
select {
case <-trigger:
rs := 64 - expected
if rs > 0 {
log.Printf("Fetching %d", rs)
nc.PublishRequest(consumer.NextSubject(), ib, []byte(strconv.Itoa(rs)))
expected += rs
}
case _, ok := <-done:
if !ok {
return
}
}
}
}
go filler()
// fetch initial data
trigger <- struct{}{}
cnt := 1
for {
select {
case msg := <-buffer:
// time.Sleep(time.Millisecond)
nc.PublishRequest(msg.Reply, ib, server.AckNext)
log.Printf("buffer size: %d/%d (%s)", len(buffer), cap(buffer), string(msg.Data))
if cnt == tests {
close(done)
continue
}
cnt++
if len(buffer) < cap(buffer)/2 {
trigger <- struct{}{}
}
case _, ok := <-done:
if !ok {
return nil
}
}
}
return nil
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment