Skip to content

Instantly share code, notes, and snippets.

@ripienaar
Last active April 10, 2020 11:17
Show Gist options
  • Save ripienaar/d266a87a46d5c0ea65540cb065d11d20 to your computer and use it in GitHub Desktop.
Save ripienaar/d266a87a46d5c0ea65540cb065d11d20 to your computer and use it in GitHub Desktop.
package main
import (
"strconv"
"sync"
"time"
"github.com/nats-io/jsm.go"
"github.com/nats-io/nats.go"
)
// JSPullBuffer is a buffered pull based consumer that is designed to optimise
// latency on slow links as well as network usage
//
// I try to optimise for these things:
//
// - Unstable networks and assuming connects and reconnects both in the
// client and the cluster, so I do not rely on single very long running
// polls under high load scenarios since its common on a 3G connection
// that you might become a slow consumer, I'd rather poll semi frequently
// however polls are done on a backoff, when idle only 1 poll a minute
// this could be made more intelligent by integrating with the client
// and detecting disconnects and switching to frequent polls only when
// it disconnects, but that would still not know about disconnects in
// the cluster so this design strikes a balance
//
// - It tries to perform fast at times when there is a burst of traffic
// switching to a more aggressive strategy for keeping the buffer full
// when there is a burst of traffic, this will gradually become less
// aggressive once the burst is over
//
// - It tries to use as little as possible network bandwidth while polling
// by using +NXT acks when there is traffic each time asking for the next
// message on acks, this avoids polling all together when there is a steady
// rate of messages. Polling will be used on low buffers or low traffic
// situation
//
// It builds a buffer of a certain size and request from JS that many messages
// at start using the NEXT subject
//
// It will then regularly check the size of the buffer and if it reaches a low
// water mark of 50% it will do a NEXT again with enough messages to fill up the
// buffer
//
// These polls are done on a backoff frequency, it starts once every 2 seconds
// but will eventually do them once a minute only. If a burst of 10 or more
// messages enters it will again frequently check and poll etc trying its
// best to get the buffer full, as traffic levels off it will gradually go
// toward a 1 minute poll again
//
// Your client that use the buffer should use buffer.Ack(msg) to do acknowldgements
// this will use the +NXT style acks that will ask the server for the next message
// at every ack so when there is a lot of traffic this is often sufficient to maintain
// the buffer at a good stable level so no polls are done
//
// buffer, err := NewPullBuffer(nc, consumer, 64)
// if err != nil {
// panic(err)
// }
//
// for msg := range buffer.Q {
// buffer.Ack(msg)
// }
//
// Always call buffer.Close() to free up the resources, after close the buffer should be
// discarded once all the messages are drained
type JSPullBuffer struct {
Q chan *nats.Msg
sz int
closer chan struct{}
nextSubj string
lowWater int
interval time.Duration
ib string
nc *nats.Conn
closed bool
backoff backoffPolicy
lastpoll time.Time
sync.Mutex
}
type backoffPolicy struct {
Millis []int
}
func (b backoffPolicy) Duration(n int) time.Duration {
if n >= len(b.Millis) {
n = len(b.Millis) - 1
}
return time.Duration(b.Millis[n]) * time.Millisecond
}
func NewPullBuffer(nc *nats.Conn, c *jsm.Consumer, buffer int) (*JSPullBuffer, error) {
b := &JSPullBuffer{
Q: make(chan *nats.Msg, buffer+1),
sz: buffer,
closer: make(chan struct{}, 1),
nextSubj: c.NextSubject(),
lowWater: buffer / 2,
interval: 10 * time.Millisecond,
ib: nats.NewInbox(),
nc: nc,
backoff: backoffPolicy{},
}
for i := 2000; i <= 60000; i += 500 {
b.backoff.Millis = append(b.backoff.Millis, i)
}
err := b.start()
return b, err
}
func (j *JSPullBuffer) start() error {
seen := 0
trigger := make(chan int, 1)
ticker := time.NewTicker(j.interval)
pollfreq := j.interval
sub, err := j.nc.Subscribe(j.ib, func(m *nats.Msg) {
j.Q <- m
j.Lock()
seen++
j.Unlock()
})
if err != nil {
return err
}
// start first poll
trigger <- j.sz
worker := func() {
idlecnt := 0
for {
select {
case <-ticker.C:
j.Lock()
// we check frequently but only poll pollfreq so we dont end up flooding the link
// with next requests in idle times
if time.Now().Sub(j.lastpoll) < pollfreq {
j.Unlock()
continue
}
switch {
// if nothing came in since the last poll we increase the frequency
// so we poll less and less and less in idle times
case seen == 0:
pollfreq = j.backoff.Duration(idlecnt)
idlecnt++
// if we had a little burst of 10 messages arrived since last trigger we reset the backoff
// and enter a period of fast buffer len checks and poll that gets slower and slower
// and will remain fast as long as there is a lot of traffic
case seen > 10:
idlecnt = 0
pollfreq = j.interval
}
j.Unlock()
bl := len(j.Q)
if bl < j.lowWater {
trigger <- j.sz - bl
}
case want := <-trigger:
j.Lock()
j.lastpoll = time.Now()
seen = 0
j.Unlock()
nc.PublishRequest(j.nextSubj, j.ib, []byte(strconv.Itoa(want)))
case <-j.closer:
j.Lock()
if j.closed {
j.Unlock()
return
}
ticker.Stop()
sub.Unsubscribe()
close(j.Q)
j.closed = true
j.Unlock()
return
}
}
}
go worker()
return nil
}
func (j *JSPullBuffer) Ack(m *nats.Msg) {
// only do +NXT when we are above the low water to try to keep things stable
// but if we fall below low water its best to let the watched to a NEXT pull
// for many messages a time so the buffer can build up again
if len(j.Q) >= j.lowWater {
j.nc.PublishRequest(m.Reply, j.ib, []byte("+NXT"))
j.Lock()
j.lastpoll = time.Now()
j.Unlock()
} else {
j.nc.PublishRequest(m.Reply, j.ib, nil)
}
}
func (j *JSPullBuffer) Close() {
close(j.closer)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment