Skip to content

Instantly share code, notes, and snippets.

@jehiah
Last active June 30, 2020 08:13
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jehiah/45af9135b6ecf5537646 to your computer and use it in GitHub Desktop.
Save jehiah/45af9135b6ecf5537646 to your computer and use it in GitHub Desktop.
NSQ Producer testing abstraction
package nsqutils
import (
"sync"
"time"
nsq "github.com/nsqio/go-nsq"
)
// Producer is an interface that nsq.Producer fulfills
type Producer interface {
Publish(topic string, body []byte) error
PublishAsync(topic string, body []byte, doneChan chan *nsq.ProducerTransaction, args ...interface{}) error
}
// TestProducer implements Producer and just counts the number of messages
// optionally if it includes a nsq.Producer it will also send messages
type TestProducer struct {
Counters map[string]int32
Producer *nsq.Producer
LastMessage []byte
LastTopic string
Messages [][]byte
sync.Mutex
}
// Reset the counters
func (p *TestProducer) Reset() {
p.Lock()
p.Counters = nil
p.LastMessage = make([]byte, 0)
p.LastTopic = ""
p.Messages = make([][]byte, 0)
p.Unlock()
}
// Count the total number of events
func (p *TestProducer) Count() int32 {
p.Lock()
defer p.Unlock()
var i int32
for _, c := range p.Counters {
i += c
}
return i
}
// Publish tracks publishing to a topic
func (p *TestProducer) Publish(topic string, body []byte) error {
p.Lock()
defer p.Unlock()
if p.Counters == nil {
p.Counters = make(map[string]int32)
}
p.Counters[topic]++
p.LastMessage = body
p.LastTopic = topic
p.Messages = append(p.Messages, body)
if p.Producer != nil {
return p.Producer.Publish(topic, body)
}
return nil
}
// PublishAsync is a wrapper on Publish
func (p *TestProducer) PublishAsync(topic string, body []byte, doneChan chan *nsq.ProducerTransaction, args ...interface{}) error {
return p.Publish(topic, body)
}
// NoOpMessageDelegate can be used when constructing a test nsq.Message to allow Finish/Requeue
// messages to be a NoOp
type NoOpMessageDelegate struct{}
func (d *NoOpMessageDelegate) OnFinish(m *nsq.Message) {}
func (d *NoOpMessageDelegate) OnRequeue(m *nsq.Message, delay time.Duration, backoff bool) {}
func (d *NoOpMessageDelegate) OnTouch(m *nsq.Message) {}
// NoOpMessage makes a message using NoOpMessageDelegate
func NoOpMessage(body string) *nsq.Message {
return &nsq.Message{Body: []byte(body), Delegate: &NoOpMessageDelegate{}}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment