Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Interface for testing nsq producers.
package nsqutils
import (
"sync"
"time"
"github.com/nsqio/go-nsq"
)
// Producer is an interface that nsq.Producer fulfills
type Producer interface {
Publish(string, []byte) error
}
// TestProducer implements Producer and just counts the number of messages
// optionally if it includes a nsq.Producer it will also send messages
type TestProducer struct {
Counters map[string]int32
Producer *nsq.Producer
LastMessage []byte
sync.Mutex
}
// Reset the counters
func (p *TestProducer) Reset() {
p.Lock()
p.Counters = nil
p.LastMessage = make([]byte, 0)
p.Unlock()
}
// Count the total number of events
func (p *TestProducer) Count() int32 {
p.Lock()
defer p.Unlock()
var i int32
for _, c := range p.Counters {
i += c
}
return i
}
// Publish tracks publishing to a topic
func (p *TestProducer) Publish(topic string, body []byte) error {
p.Lock()
defer p.Unlock()
if p.Counters == nil {
p.Counters = make(map[string]int32)
}
p.Counters[topic]++
p.LastMessage = body
if p.Producer != nil {
return p.Producer.Publish(topic, body)
}
return nil
}
// NoOpMessageDelegate can be used when constructing a test nsq.Message to allow Finish/Requeue
// messages to be a NoOp
type NoOpMessageDelegate struct{}
func (d *NoOpMessageDelegate) OnFinish(m *nsq.Message) {}
func (d *NoOpMessageDelegate) OnRequeue(m *nsq.Message, delay time.Duration, backoff bool) {}
func (d *NoOpMessageDelegate) OnTouch(m *nsq.Message) {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.