Last active
April 10, 2020 11:17
-
-
Save ripienaar/d266a87a46d5c0ea65540cb065d11d20 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"strconv" | |
"sync" | |
"time" | |
"github.com/nats-io/jsm.go" | |
"github.com/nats-io/nats.go" | |
) | |
// JSPullBuffer is a buffered pull based consumer that is designed to optimise | |
// latency on slow links as well as network usage | |
// | |
// I try to optimise for these things: | |
// | |
// - Unstable networks and assuming connects and reconnects both in the | |
// client and the cluster, so I do not rely on single very long running | |
// polls under high load scenarios since its common on a 3G connection | |
// that you might become a slow consumer, I'd rather poll semi frequently | |
// however polls are done on a backoff, when idle only 1 poll a minute | |
// this could be made more intelligent by integrating with the client | |
// and detecting disconnects and switching to frequent polls only when | |
// it disconnects, but that would still not know about disconnects in | |
// the cluster so this design strikes a balance | |
// | |
// - It tries to perform fast at times when there is a burst of traffic | |
// switching to a more aggressive strategy for keeping the buffer full | |
// when there is a burst of traffic, this will gradually become less | |
// aggressive once the burst is over | |
// | |
// - It tries to use as little as possible network bandwidth while polling | |
// by using +NXT acks when there is traffic each time asking for the next | |
// message on acks, this avoids polling all together when there is a steady | |
// rate of messages. Polling will be used on low buffers or low traffic | |
// situation | |
// | |
// It builds a buffer of a certain size and request from JS that many messages | |
// at start using the NEXT subject | |
// | |
// It will then regularly check the size of the buffer and if it reaches a low | |
// water mark of 50% it will do a NEXT again with enough messages to fill up the | |
// buffer | |
// | |
// These polls are done on a backoff frequency, it starts once every 2 seconds | |
// but will eventually do them once a minute only. If a burst of 10 or more | |
// messages enters it will again frequently check and poll etc trying its | |
// best to get the buffer full, as traffic levels off it will gradually go | |
// toward a 1 minute poll again | |
// | |
// Your client that use the buffer should use buffer.Ack(msg) to do acknowldgements | |
// this will use the +NXT style acks that will ask the server for the next message | |
// at every ack so when there is a lot of traffic this is often sufficient to maintain | |
// the buffer at a good stable level so no polls are done | |
// | |
// buffer, err := NewPullBuffer(nc, consumer, 64) | |
// if err != nil { | |
// panic(err) | |
// } | |
// | |
// for msg := range buffer.Q { | |
// buffer.Ack(msg) | |
// } | |
// | |
// Always call buffer.Close() to free up the resources, after close the buffer should be | |
// discarded once all the messages are drained | |
type JSPullBuffer struct { | |
Q chan *nats.Msg | |
sz int | |
closer chan struct{} | |
nextSubj string | |
lowWater int | |
interval time.Duration | |
ib string | |
nc *nats.Conn | |
closed bool | |
backoff backoffPolicy | |
lastpoll time.Time | |
sync.Mutex | |
} | |
type backoffPolicy struct { | |
Millis []int | |
} | |
func (b backoffPolicy) Duration(n int) time.Duration { | |
if n >= len(b.Millis) { | |
n = len(b.Millis) - 1 | |
} | |
return time.Duration(b.Millis[n]) * time.Millisecond | |
} | |
func NewPullBuffer(nc *nats.Conn, c *jsm.Consumer, buffer int) (*JSPullBuffer, error) { | |
b := &JSPullBuffer{ | |
Q: make(chan *nats.Msg, buffer+1), | |
sz: buffer, | |
closer: make(chan struct{}, 1), | |
nextSubj: c.NextSubject(), | |
lowWater: buffer / 2, | |
interval: 10 * time.Millisecond, | |
ib: nats.NewInbox(), | |
nc: nc, | |
backoff: backoffPolicy{}, | |
} | |
for i := 2000; i <= 60000; i += 500 { | |
b.backoff.Millis = append(b.backoff.Millis, i) | |
} | |
err := b.start() | |
return b, err | |
} | |
func (j *JSPullBuffer) start() error { | |
seen := 0 | |
trigger := make(chan int, 1) | |
ticker := time.NewTicker(j.interval) | |
pollfreq := j.interval | |
sub, err := j.nc.Subscribe(j.ib, func(m *nats.Msg) { | |
j.Q <- m | |
j.Lock() | |
seen++ | |
j.Unlock() | |
}) | |
if err != nil { | |
return err | |
} | |
// start first poll | |
trigger <- j.sz | |
worker := func() { | |
idlecnt := 0 | |
for { | |
select { | |
case <-ticker.C: | |
j.Lock() | |
// we check frequently but only poll pollfreq so we dont end up flooding the link | |
// with next requests in idle times | |
if time.Now().Sub(j.lastpoll) < pollfreq { | |
j.Unlock() | |
continue | |
} | |
switch { | |
// if nothing came in since the last poll we increase the frequency | |
// so we poll less and less and less in idle times | |
case seen == 0: | |
pollfreq = j.backoff.Duration(idlecnt) | |
idlecnt++ | |
// if we had a little burst of 10 messages arrived since last trigger we reset the backoff | |
// and enter a period of fast buffer len checks and poll that gets slower and slower | |
// and will remain fast as long as there is a lot of traffic | |
case seen > 10: | |
idlecnt = 0 | |
pollfreq = j.interval | |
} | |
j.Unlock() | |
bl := len(j.Q) | |
if bl < j.lowWater { | |
trigger <- j.sz - bl | |
} | |
case want := <-trigger: | |
j.Lock() | |
j.lastpoll = time.Now() | |
seen = 0 | |
j.Unlock() | |
nc.PublishRequest(j.nextSubj, j.ib, []byte(strconv.Itoa(want))) | |
case <-j.closer: | |
j.Lock() | |
if j.closed { | |
j.Unlock() | |
return | |
} | |
ticker.Stop() | |
sub.Unsubscribe() | |
close(j.Q) | |
j.closed = true | |
j.Unlock() | |
return | |
} | |
} | |
} | |
go worker() | |
return nil | |
} | |
func (j *JSPullBuffer) Ack(m *nats.Msg) { | |
// only do +NXT when we are above the low water to try to keep things stable | |
// but if we fall below low water its best to let the watched to a NEXT pull | |
// for many messages a time so the buffer can build up again | |
if len(j.Q) >= j.lowWater { | |
j.nc.PublishRequest(m.Reply, j.ib, []byte("+NXT")) | |
j.Lock() | |
j.lastpoll = time.Now() | |
j.Unlock() | |
} else { | |
j.nc.PublishRequest(m.Reply, j.ib, nil) | |
} | |
} | |
func (j *JSPullBuffer) Close() { | |
close(j.closer) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment